You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/04/13 22:59:48 UTC

[01/11] impala git commit: IMPALA-5717: Support for reading ORC data files

Repository: impala
Updated Branches:
  refs/heads/2.x 83422a32f -> e3bdc3e24


http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 0f1ea5d..8733af3 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -58,7 +58,8 @@ enum THdfsFileFormat {
   SEQUENCE_FILE,
   AVRO,
   PARQUET,
-  KUDU
+  KUDU,
+  ORC
 }
 
 // TODO: Since compression is also enabled for Kudu columns, we should
@@ -73,7 +74,8 @@ enum THdfsCompression {
   SNAPPY_BLOCKED,
   LZO,
   LZ4,
-  ZLIB
+  ZLIB,
+  ZSTD
 }
 
 enum TColumnEncoding {

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 9802778..46a1d32 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -263,8 +263,8 @@ terminal
   KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
   KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN,
   KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP,
-  KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER,
-  KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
+  KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC,
+  KW_ORDER, KW_OUTER, KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION,
   KW_PARTITIONED, KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
   KW_PURGE, KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFRESH, KW_REGEXP, KW_RENAME,
   KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS, KW_REVOKE,
@@ -1554,6 +1554,8 @@ file_format_val ::=
   {: RESULT = THdfsFileFormat.PARQUET; :}
   | KW_PARQUETFILE
   {: RESULT = THdfsFileFormat.PARQUET; :}
+  | KW_ORC
+  {: RESULT = THdfsFileFormat.ORC; :}
   | KW_TEXTFILE
   {: RESULT = THdfsFileFormat.TEXT; :}
   | KW_SEQUENCEFILE
@@ -3479,6 +3481,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_OR:r
   {: RESULT = r.toString(); :}
+  | KW_ORC:r
+  {: RESULT = r.toString(); :}
   | KW_ORDER:r
   {: RESULT = r.toString(); :}
   | KW_OUTER:r

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 5df3dfa..e442d66 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -743,10 +743,10 @@ public class ComputeStatsStmt extends StatementBase {
   public Set<Column> getValidatedColumnWhitelist() { return validatedColumnWhitelist_; }
 
   /**
-   * Returns true if this statement computes stats on Parquet partitions only,
+   * Returns true if this statement computes stats on Parquet/ORC partitions only,
    * false otherwise.
    */
-  public boolean isParquetOnly() {
+  public boolean isColumnar() {
     if (!(table_ instanceof HdfsTable)) return false;
     Collection<HdfsPartition> affectedPartitions = null;
     if (partitionSet_ != null) {
@@ -755,7 +755,9 @@ public class ComputeStatsStmt extends StatementBase {
       affectedPartitions = ((HdfsTable) table_).getPartitions();
     }
     for (HdfsPartition partition: affectedPartitions) {
-      if (partition.getFileFormat() != HdfsFileFormat.PARQUET) return false;
+      if (partition.getFileFormat() != HdfsFileFormat.PARQUET
+          && partition.getFileFormat() != HdfsFileFormat.ORC)
+        return false;
     }
     return true;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index e4fce60..32cae72 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -62,6 +62,10 @@ public enum HdfsFileFormat {
       "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
       "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
       true, true),
+  ORC("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+      "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+      "org.apache.hadoop.hive.ql.io.orc.OrcSerde",
+      false, true),
   KUDU("org.apache.kudu.mapreduce.KuduTableInputFormat",
       "org.apache.kudu.mapreduce.KuduTableOutputFormat",
       "", false, false);
@@ -99,19 +103,23 @@ public enum HdfsFileFormat {
       "parquet.hive.MapredParquetInputFormat"
   };
 
-  private static final Map<String, HdfsFileFormat> VALID_INPUT_FORMATS =
-      ImmutableMap.<String, HdfsFileFormat>builder()
-          .put(RC_FILE.inputFormat(), RC_FILE)
-          .put(TEXT.inputFormat(), TEXT)
-          .put(LZO_TEXT.inputFormat(), TEXT)
-          .put(SEQUENCE_FILE.inputFormat(), SEQUENCE_FILE)
-          .put(AVRO.inputFormat(), AVRO)
-          .put(PARQUET.inputFormat(), PARQUET)
-          .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET)
-          .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET)
-          .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET)
-          .put(KUDU.inputFormat(), KUDU)
-          .build();
+  private static Map<String, HdfsFileFormat> VALID_INPUT_FORMATS;
+  public static void init(boolean enableOrcScanner) {
+    ImmutableMap.Builder<String, HdfsFileFormat> builder =
+        ImmutableMap.<String, HdfsFileFormat>builder()
+            .put(RC_FILE.inputFormat(), RC_FILE)
+            .put(TEXT.inputFormat(), TEXT)
+            .put(LZO_TEXT.inputFormat(), TEXT)
+            .put(SEQUENCE_FILE.inputFormat(), SEQUENCE_FILE)
+            .put(AVRO.inputFormat(), AVRO)
+            .put(PARQUET.inputFormat(), PARQUET)
+            .put(PARQUET_LEGACY_INPUT_FORMATS[0], PARQUET)
+            .put(PARQUET_LEGACY_INPUT_FORMATS[1], PARQUET)
+            .put(PARQUET_LEGACY_INPUT_FORMATS[2], PARQUET)
+            .put(KUDU.inputFormat(), KUDU);
+    if (enableOrcScanner) builder.put(ORC.inputFormat(), ORC);
+    VALID_INPUT_FORMATS = builder.build();
+  }
 
   /**
    * Returns true if the string describes an input format class that we support.
@@ -145,6 +153,7 @@ public enum HdfsFileFormat {
       case TEXT: return HdfsFileFormat.TEXT;
       case SEQUENCE_FILE: return HdfsFileFormat.SEQUENCE_FILE;
       case AVRO: return HdfsFileFormat.AVRO;
+      case ORC: return HdfsFileFormat.ORC;
       case PARQUET: return HdfsFileFormat.PARQUET;
       case KUDU: return HdfsFileFormat.KUDU;
       default:
@@ -159,6 +168,7 @@ public enum HdfsFileFormat {
       case TEXT: return THdfsFileFormat.TEXT;
       case SEQUENCE_FILE: return THdfsFileFormat.SEQUENCE_FILE;
       case AVRO: return THdfsFileFormat.AVRO;
+      case ORC: return THdfsFileFormat.ORC;
       case PARQUET: return THdfsFileFormat.PARQUET;
       case KUDU: return THdfsFileFormat.KUDU;
       default:
@@ -170,6 +180,7 @@ public enum HdfsFileFormat {
   public String toSql(HdfsCompression compressionType) {
     switch (this) {
       case RC_FILE: return "RCFILE";
+      case ORC: return "ORC";
       case TEXT:
         if (compressionType == HdfsCompression.LZO ||
             compressionType == HdfsCompression.LZO_INDEX) {
@@ -240,6 +251,7 @@ public enum HdfsFileFormat {
       case SEQUENCE_FILE:
       case AVRO:
       case PARQUET:
+      case ORC:
         return true;
       case KUDU:
         return false;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
index b4e2564..f51b10e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
@@ -57,6 +57,7 @@ public class HdfsStorageDescriptor {
       "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", // (seq / text / parquet)
       "org.apache.hadoop.hive.serde2.avro.AvroSerDe", // (avro)
       "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", // (rc)
+      "org.apache.hadoop.hive.ql.io.orc.OrcSerde", // (orc)
       "parquet.hive.serde.ParquetHiveSerDe", // (parquet - legacy)
       // TODO: Verify the following Parquet SerDe works with Impala and add
       // support for the new input/output format classes. See IMPALA-4214.

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 7735f98..ac67d7d 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -332,11 +332,12 @@ public class HdfsScanNode extends ScanNode {
     Set<HdfsFileFormat> fileFormats = computeScanRangeLocations(analyzer);
 
     // Determine backend scan node implementation to use. The optimized MT implementation
-    // is currently only supported for Parquet.
+    // is currently supported for Parquet, ORC and Text.
     if (analyzer.getQueryOptions().isSetMt_dop() &&
         analyzer.getQueryOptions().mt_dop > 0 &&
         fileFormats.size() == 1 &&
         (fileFormats.contains(HdfsFileFormat.PARQUET)
+          || fileFormats.contains(HdfsFileFormat.ORC)
           || fileFormats.contains(HdfsFileFormat.TEXT))) {
       useMtScanNode_ = true;
     } else {
@@ -1191,9 +1192,10 @@ public class HdfsScanNode extends ScanNode {
     Preconditions.checkNotNull(desc_.getTable() instanceof HdfsTable);
     HdfsTable table = (HdfsTable) desc_.getTable();
     int perHostScanRanges;
-    if (table.getMajorityFormat() == HdfsFileFormat.PARQUET) {
+    if (table.getMajorityFormat() == HdfsFileFormat.PARQUET
+        || table.getMajorityFormat() == HdfsFileFormat.ORC) {
       // For the purpose of this estimation, the number of per-host scan ranges for
-      // Parquet files are equal to the number of columns read from the file. I.e.
+      // Parquet/ORC files are equal to the number of columns read from the file. I.e.
       // excluding partition columns and columns that are populated from file metadata.
       perHostScanRanges = 0;
       for (SlotDescriptor slot: desc_.getSlots()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 3833094..a94f46e 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.impala.analysis.SqlScanner;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.thrift.TBackendGflags;
 
 import com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@ public class BackendConfig {
     Preconditions.checkNotNull(cfg);
     INSTANCE = new BackendConfig(cfg);
     SqlScanner.init(cfg.getReserved_words_version());
+    HdfsFileFormat.init(cfg.isEnable_orc_scanner());
     initAuthToLocal();
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 556d6ac..7ed44a3 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -962,11 +962,11 @@ public class Frontend {
       if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
         result.catalog_op_request.setLineage_graph(thriftLineageGraph);
       }
-      // Set MT_DOP=4 for COMPUTE STATS on Parquet tables, unless the user has already
+      // Set MT_DOP=4 for COMPUTE STATS on Parquet/ORC tables, unless the user has already
       // provided another value for MT_DOP.
       if (!queryOptions.isSetMt_dop() &&
           analysisResult.isComputeStatsStmt() &&
-          analysisResult.getComputeStatsStmt().isParquetOnly()) {
+          analysisResult.getComputeStatsStmt().isColumnar()) {
         queryOptions.setMt_dop(4);
       }
       // If unset, set MT_DOP to 0 to simplify the rest of the code.

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/fe/src/main/jflex/sql-scanner.flex
----------------------------------------------------------------------
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 2accebd..e037aca 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -176,6 +176,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("on", SqlParserSymbols.KW_ON);
     keywordMap.put("||", SqlParserSymbols.KW_OR);
     keywordMap.put("or", SqlParserSymbols.KW_OR);
+    keywordMap.put("orc", SqlParserSymbols.KW_ORC);
     keywordMap.put("order", SqlParserSymbols.KW_ORDER);
     keywordMap.put("outer", SqlParserSymbols.KW_OUTER);
     keywordMap.put("over", SqlParserSymbols.KW_OVER);

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/LineItemMultiBlock/README.dox
----------------------------------------------------------------------
diff --git a/testdata/LineItemMultiBlock/README.dox b/testdata/LineItemMultiBlock/README.dox
index 7608067..1d6db46 100755
--- a/testdata/LineItemMultiBlock/README.dox
+++ b/testdata/LineItemMultiBlock/README.dox
@@ -1,6 +1,7 @@
 This file was created for:
 IMPALA-1881: Maximize data locality when scanning Parquet files with multiple row groups.
 IMPALA-2466: Add more tests to the HDFS parquet scanner.
+IMPALA-5717: Add tests for HDFS orc scanner.
 
 The table lineitem_multiblock is a single parquet file with:
  - A row group size of approximately 12 KB each.
@@ -31,3 +32,21 @@ blocks.
 
 'lineitem_multiblock_one_row_group' was created similarly but with a much higher
 'parquet.block.size' so that everything fit in one row group.
+
+----
+
+The orc files are created by the following hive queries:
+
+use functional_orc_def;
+
+set orc.stripe.size=1024;
+set orc.compress=ZLIB;
+create table lineitem_threeblocks like tpch.lineitem stored as orc;
+create table lineitem_sixblocks like tpch.lineitem stored as orc;
+insert overwrite table lineitem_threeblocks select * from tpch.lineitem limit 16000;
+insert overwrite table lineitem_sixblocks select * from tpch.lineitem limit 30000;
+
+set orc.stripe.size=67108864;
+create table lineitem_orc_multiblock_one_stripe like tpch.lineitem stored as orc;
+insert overwrite table lineitem_orc_multiblock_one_stripe select * from
+tpch.lineitem limit 16000;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc
----------------------------------------------------------------------
diff --git a/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc b/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc
new file mode 100644
index 0000000..7dbbffb
Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_orc_multiblock_one_stripe.orc differ

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/LineItemMultiBlock/lineitem_sixblocks.orc
----------------------------------------------------------------------
diff --git a/testdata/LineItemMultiBlock/lineitem_sixblocks.orc b/testdata/LineItemMultiBlock/lineitem_sixblocks.orc
new file mode 100644
index 0000000..5fa6cfa
Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_sixblocks.orc differ

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/LineItemMultiBlock/lineitem_threeblocks.orc
----------------------------------------------------------------------
diff --git a/testdata/LineItemMultiBlock/lineitem_threeblocks.orc b/testdata/LineItemMultiBlock/lineitem_threeblocks.orc
new file mode 100644
index 0000000..9b12540
Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_threeblocks.orc differ

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 311029d..e50515b 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -154,6 +154,9 @@ function load-custom-schemas {
   hadoop fs -mkdir -p /test-warehouse/chars_formats_parquet/
   hadoop fs -put -f ${IMPALA_HOME}/testdata/data/chars-formats.parquet \
     /test-warehouse/chars_formats_parquet
+  hadoop fs -mkdir -p /test-warehouse/chars_formats_orc_def/
+  hadoop fs -put -f ${IMPALA_HOME}/testdata/data/chars-formats.orc \
+    /test-warehouse/chars_formats_orc_def
   hadoop fs -mkdir -p /test-warehouse/chars_formats_text/
   hadoop fs -put -f ${IMPALA_HOME}/testdata/data/chars-formats.txt \
     /test-warehouse/chars_formats_text

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 34c2084..3f730e6 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -128,6 +128,7 @@ FILE_FORMAT_MAP = {
   'text': 'TEXTFILE',
   'seq': 'SEQUENCEFILE',
   'rc': 'RCFILE',
+  'orc': 'ORC',
   'parquet': 'PARQUET',
   'text_lzo':
     "\nINPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'" +
@@ -219,7 +220,7 @@ def build_table_template(file_format, columns, partition_columns, row_format,
     else:
       tblproperties["avro.schema.url"] = "hdfs://%s/%s/%s/{table_name}.json" \
         % (options.hdfs_namenode, options.hive_warehouse_dir, avro_schema_dir)
-  elif file_format in 'parquet':
+  elif file_format in ['parquet', 'orc']:  # columnar formats don't need row format
     row_format_stmt = str()
   elif file_format == 'kudu':
     # Use partitioned_by to set a trivial hash distribution
@@ -243,7 +244,7 @@ def build_table_template(file_format, columns, partition_columns, row_format,
     for table_property in table_properties.split("\n"):
       format_prop = table_property.split(":")
       if format_prop[0] == file_format:
-        key_val = format_prop[1].split("=");
+        key_val = format_prop[1].split("=")
         tblproperties[key_val[0]] = key_val[1]
 
   all_tblproperties = []
@@ -658,7 +659,7 @@ def generate_statements(output_name, test_vectors, sections,
             # that weren't already added to the table. So, for force reload, manually
             # delete the partition directories.
             output.create.append(("DFS -rm -R {data_path};").format(
-              data_path=data_path));
+              data_path=data_path))
           else:
             # If this is not a force reload use msck repair to add the partitions
             # into the table.

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/bin/run-hive-server.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index 42d95b5..0d69c7a 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -71,8 +71,9 @@ ${CLUSTER_BIN}/wait-for-metastore.py --transport=${METASTORE_TRANSPORT}
 
 if [ ${ONLY_METASTORE} -eq 0 ]; then
   # Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT
-  # environment variable.
-  HADOOP_HEAPSIZE="512" hive --service hiveserver2 > ${LOGDIR}/hive-server2.out 2>&1 &
+  # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM
+  # when loading ORC tables like widerow.
+  HADOOP_HEAPSIZE="2048" hive --service hiveserver2 > ${LOGDIR}/hive-server2.out 2>&1 &
 
   # Wait for the HiveServer2 service to come up because callers of this script
   # may rely on it being available.

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
----------------------------------------------------------------------
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
index f72dd97..c9ee70b 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/hdfs-site.xml.tmpl
@@ -82,6 +82,12 @@
     <value>134217728</value>
   </property>
 
+  <!-- Decrease this so we can create mini test files across several blocks -->
+  <property>
+    <name>dfs.namenode.fs-limits.min-block-size</name>
+    <value>1024</value>
+  </property>
+
   <!-- Set the max cached memory to ~64kb. This must be less than ulimit -l -->
   <property>
     <name>dfs.datanode.max.locked.memory</name>

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/data/chars-formats.orc
----------------------------------------------------------------------
diff --git a/testdata/data/chars-formats.orc b/testdata/data/chars-formats.orc
new file mode 100644
index 0000000..625c2c8
Binary files /dev/null and b/testdata/data/chars-formats.orc differ

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index cede525..a7a5eac 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -739,6 +739,7 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=1) SELECT i
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=2) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny;
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=3) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny;
 INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=4) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny;
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=5) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny;
 -- The order of insertions and alterations is deliberately chose to work around a Hive
 -- bug where the format of an altered partition is reverted back to the original format after
 -- an insert. So we first do the insert, and then alter the format.
@@ -746,6 +747,7 @@ USE {db_name}{db_suffix};
 ALTER TABLE {table_name} PARTITION (p=2) SET FILEFORMAT PARQUET;
 ALTER TABLE {table_name} PARTITION (p=3) SET FILEFORMAT AVRO;
 ALTER TABLE {table_name} PARTITION (p=4) SET FILEFORMAT RCFILE;
+ALTER TABLE {table_name} PARTITION (p=5) SET FILEFORMAT ORC;
 USE default;
 ====
 ---- DATASET

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index ef65b9a..baf0306 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -66,6 +66,7 @@ table_name:complextypes_fileformat, constraint:restrict_to, table_format:parquet
 table_name:complextypes_fileformat, constraint:restrict_to, table_format:avro/snap/block
 table_name:complextypes_fileformat, constraint:restrict_to, table_format:rc/snap/block
 table_name:complextypes_fileformat, constraint:restrict_to, table_format:seq/snap/block
+table_name:complextypes_fileformat, constraint:restrict_to, table_format:orc/def/block
 table_name:complextypes_multifileformat, constraint:restrict_to, table_format:text/none/none
 
 # TODO: Avro
@@ -134,6 +135,8 @@ table_name:decimal_tbl, constraint:restrict_to, table_format:parquet/none/none
 table_name:decimal_tiny, constraint:restrict_to, table_format:parquet/none/none
 table_name:decimal_tbl, constraint:restrict_to, table_format:kudu/none/none
 table_name:decimal_tiny, constraint:restrict_to, table_format:kudu/none/none
+table_name:decimal_tbl, constraint:restrict_to, table_format:orc/def/block
+table_name:decimal_tiny, constraint:restrict_to, table_format:orc/def/block
 
 table_name:avro_decimal_tbl, constraint:restrict_to, table_format:avro/snap/block
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test b/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test
index 9c68c65..1e61b7d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test
@@ -15,6 +15,38 @@ PLAN-ROOT SINK
    partitions=1/1 files=1 size=227B
    predicates: !empty(t.a)
 ====
+# Complex types are not supported on ORC.
+select 1 from functional_orc_def.complextypes_fileformat t, t.a
+---- PLAN
+not implemented: Scan of table 't' in format 'ORC' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
+Complex types are supported for these file formats: PARQUET.
+====
+select s.f1 from functional_orc_def.complextypes_fileformat t, t.m
+---- PLAN
+not implemented: Scan of table 't' in format 'ORC' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
+Complex types are supported for these file formats: PARQUET.
+====
+# Complex types are not supported on ORC, however queries materializing
+# only scalar type columns are allowed.
+select id from functional_orc_def.complextypes_fileformat
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_orc_def.complextypes_fileformat]
+   partitions=1/1 files=1 size=624B
+====
+# Complex types are not supported on ORC but count(*) and similar
+# queries should work.
+select count(*) from functional_orc_def.complextypes_fileformat
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional_orc_def.complextypes_fileformat]
+   partitions=1/1 files=1 size=624B
+====
 # Complex types are not supported on Avro.
 select s.f1 from functional_avro_snap.complextypes_fileformat t, t.a
 ---- PLAN
@@ -111,11 +143,12 @@ select complex_struct_col.f1 from functional_hbase.allcomplextypes
 not implemented: Scan of table 'functional_hbase.allcomplextypes.complex_struct_col.f1' is not supported because 'functional_hbase.allcomplextypes' references a nested field/collection.
 Complex types are supported for these file formats: PARQUET.
 ====
-# The complextypes_multifileformat has three partitions with different file formats:
+# The complextypes_multifileformat has five partitions with different file formats:
 # p=1 text
 # p=2 parquet
 # p=3 avro
 # p=4 rc
+# p=5 orc
 # Scanning a text partition of a multi-format table with complex types fails.
 select 1 from functional.complextypes_multifileformat where p = 1
 ---- PLAN
@@ -136,7 +169,7 @@ PLAN-ROOT SINK
 |  03:UNNEST [t.a]
 |
 00:SCAN HDFS [functional.complextypes_multifileformat t]
-   partitions=1/4 files=1 size=128B
+   partitions=1/5 files=1 size=128B
    predicates: !empty(t.a)
 ====
 # Scanning an Avro partition of a multi-format table with complex types fails.
@@ -161,5 +194,23 @@ PLAN-ROOT SINK
 |  output: count(*)
 |
 00:SCAN HDFS [functional.complextypes_multifileformat]
-   partitions=1/4 files=1 size=128B
+   partitions=1/5 files=1 size=128B
+====
+# Scanning an ORC file partition of a multi-format table with complex types fails.
+select id from functional.complextypes_multifileformat t, t.a where p = 5
+---- PLAN
+not implemented: Scan of partition 'p=5' in format 'ORC' of table 't' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
+Complex types are supported for these file formats: PARQUET.
+====
+# Complex types are not supported on ORC files but count(*) and similar
+# queries should work.
+select count(*) from functional.complextypes_multifileformat where p = 5
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.complextypes_multifileformat]
+   partitions=1/5 files=1 size=128B
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-query/functional-query_core.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/functional-query_core.csv b/testdata/workloads/functional-query/functional-query_core.csv
index dffca78..7118e3f 100644
--- a/testdata/workloads/functional-query/functional-query_core.csv
+++ b/testdata/workloads/functional-query/functional-query_core.csv
@@ -2,6 +2,7 @@
 file_format:text, dataset:functional, compression_codec:none, compression_type:none
 file_format:seq, dataset:functional, compression_codec:snap, compression_type:block
 file_format:rc, dataset: functional, compression_codec: snap, compression_type: block
+file_format:orc, dataset: functional, compression_codec: def, compression_type: block
 file_format:parquet, dataset: functional, compression_codec: none, compression_type: none
 file_format:avro, dataset: functional, compression_codec: snap, compression_type: block
 file_format:hbase, dataset:functional, compression_codec:none, compression_type:none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-query/functional-query_dimensions.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/functional-query_dimensions.csv b/testdata/workloads/functional-query/functional-query_dimensions.csv
index 539122b..bcb4406 100644
--- a/testdata/workloads/functional-query/functional-query_dimensions.csv
+++ b/testdata/workloads/functional-query/functional-query_dimensions.csv
@@ -1,4 +1,4 @@
-file_format: text,seq,rc,avro,parquet,hbase,kudu
+file_format: text,seq,rc,avro,parquet,orc,hbase,kudu
 dataset: functional
 compression_codec: none,def,gzip,bzip,snap,lzo
 compression_type: none,block,record

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-query/functional-query_exhaustive.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/functional-query_exhaustive.csv b/testdata/workloads/functional-query/functional-query_exhaustive.csv
index 18331c6..a06ab52 100644
--- a/testdata/workloads/functional-query/functional-query_exhaustive.csv
+++ b/testdata/workloads/functional-query/functional-query_exhaustive.csv
@@ -22,5 +22,6 @@ file_format: avro, dataset: functional, compression_codec: none, compression_typ
 file_format: avro, dataset: functional, compression_codec: def, compression_type: block
 file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
 file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
+file_format: orc, dataset: functional, compression_codec: def, compression_type: block
 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-query/functional-query_pairwise.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/functional-query_pairwise.csv b/testdata/workloads/functional-query/functional-query_pairwise.csv
index 0a4ee09..e046a09 100644
--- a/testdata/workloads/functional-query/functional-query_pairwise.csv
+++ b/testdata/workloads/functional-query/functional-query_pairwise.csv
@@ -4,5 +4,6 @@ file_format: seq, dataset: functional, compression_codec: def, compression_type:
 file_format: rc, dataset: functional, compression_codec: gzip, compression_type: block
 file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
 file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
+file_format: orc, dataset: functional, compression_codec: def, compression_type: block
 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test b/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
new file mode 100644
index 0000000..ee06258
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
@@ -0,0 +1,127 @@
+====
+---- QUERY
+select c1 from illtypes
+---- CATCH
+Type mismatch: table column BOOLEAN is map to column int in ORC file
+====
+---- QUERY
+select c2 from illtypes
+---- CATCH
+Type mismatch: table column FLOAT is map to column boolean in ORC file
+====
+---- QUERY
+select c3 from illtypes
+---- CATCH
+Type mismatch: table column BOOLEAN is map to column tinyint in ORC file
+====
+---- QUERY
+select c4 from illtypes
+---- CATCH
+Type mismatch: table column TINYINT is map to column smallint in ORC file
+====
+---- QUERY
+select c5 from illtypes
+---- CATCH
+Type mismatch: table column SMALLINT is map to column int in ORC file
+====
+---- QUERY
+select c6 from illtypes
+---- CATCH
+Type mismatch: table column INT is map to column bigint in ORC file
+====
+---- QUERY
+select c7 from illtypes
+---- CATCH
+Type mismatch: table column BOOLEAN is map to column float in ORC file
+====
+---- QUERY
+select c8 from illtypes
+---- CATCH
+Type mismatch: table column STRING is map to column double in ORC file
+====
+---- QUERY
+select c9 from illtypes
+---- CATCH
+Type mismatch: table column INT is map to column string in ORC file
+====
+---- QUERY
+select c10 from illtypes
+---- CATCH
+Type mismatch: table column FLOAT is map to column string in ORC file
+====
+---- QUERY
+select c11 from illtypes
+---- CATCH
+Type mismatch: table column BIGINT is map to column timestamp in ORC file
+====
+---- QUERY
+select * from safetypes order by c1
+---- TYPES
+bigint,boolean,smallint,int,bigint,bigint,double,double,char,string,timestamp,int,int
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/','1',2009-01-01 00:01:00,2009,1
+2,true,0,0,0,0,0,0,'02/','0',2009-02-01 00:00:00,2009,2
+3,false,1,1,1,10,1.100000023841858,10.1,'02/','1',2009-02-01 00:01:00,2009,2
+4,true,0,0,0,0,0,0,'03/','0',2009-03-01 00:00:00,2009,3
+5,false,1,1,1,10,1.100000023841858,10.1,'03/','1',2009-03-01 00:01:00,2009,3
+6,true,0,0,0,0,0,0,'04/','0',2009-04-01 00:00:00,2009,4
+7,false,1,1,1,10,1.100000023841858,10.1,'04/','1',2009-04-01 00:01:00,2009,4
+====
+---- QUERY
+select d1 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+1234
+2345
+12345
+12345
+132842
+====
+---- QUERY
+select d2 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+---- CATCH
+It can't be truncated to table column DECIMAL(8,0) for column decimal(10,0) in ORC file
+====
+---- QUERY
+select d3 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+1.2345678900
+12.3456789000
+123.4567890000
+1234.5678900000
+12345.6789000000
+====
+---- QUERY
+select d4 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+---- CATCH
+Type mismatch: table column DECIMAL(20,20) is map to column decimal(38,38) in ORC file
+====
+---- QUERY
+select d5 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+---- CATCH
+Type mismatch: table column DECIMAL(2,0) is map to column decimal(10,5) in ORC file
+====
+---- QUERY
+select d6 from mismatch_decimals
+---- TYPES
+decimal
+---- RESULTS
+1
+1
+1
+1
+1
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpcds/tpcds_core.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpcds/tpcds_core.csv b/testdata/workloads/tpcds/tpcds_core.csv
index 94b4b22..48cc97d 100644
--- a/testdata/workloads/tpcds/tpcds_core.csv
+++ b/testdata/workloads/tpcds/tpcds_core.csv
@@ -2,3 +2,4 @@
 file_format: text, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: seq, dataset: tpcds, compression_codec: snap, compression_type: block
 file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpcds/tpcds_dimensions.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpcds/tpcds_dimensions.csv b/testdata/workloads/tpcds/tpcds_dimensions.csv
index 8137b7a..bae5d90 100644
--- a/testdata/workloads/tpcds/tpcds_dimensions.csv
+++ b/testdata/workloads/tpcds/tpcds_dimensions.csv
@@ -1,4 +1,4 @@
-file_format: text,seq,rc,avro,parquet
+file_format: text,seq,rc,avro,parquet,orc
 dataset: tpcds
 compression_codec: none,def,gzip,bzip,snap,lzo
 compression_type: none,block,record

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpcds/tpcds_exhaustive.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpcds/tpcds_exhaustive.csv b/testdata/workloads/tpcds/tpcds_exhaustive.csv
index c4b4f99..57fcddd 100644
--- a/testdata/workloads/tpcds/tpcds_exhaustive.csv
+++ b/testdata/workloads/tpcds/tpcds_exhaustive.csv
@@ -21,3 +21,6 @@ file_format: avro, dataset: tpcds, compression_codec: snap, compression_type: bl
 file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: parquet, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: parquet, dataset: tpcds, compression_codec: snap, compression_type: block
+file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
+file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpcds/tpcds_pairwise.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpcds/tpcds_pairwise.csv b/testdata/workloads/tpcds/tpcds_pairwise.csv
index e643495..61ee66c 100644
--- a/testdata/workloads/tpcds/tpcds_pairwise.csv
+++ b/testdata/workloads/tpcds/tpcds_pairwise.csv
@@ -13,3 +13,6 @@ file_format: rc, dataset: tpcds, compression_codec: def, compression_type: block
 file_format: avro, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: parquet, dataset: tpcds, compression_codec: none, compression_type: none
 file_format: rc, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpcds, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpcds, compression_codec: def, compression_type: block
+file_format: orc, dataset: tpcds, compression_codec: snap, compression_type: block

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpch/tpch_core.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/tpch_core.csv b/testdata/workloads/tpch/tpch_core.csv
index 86804ac..024063c 100644
--- a/testdata/workloads/tpch/tpch_core.csv
+++ b/testdata/workloads/tpch/tpch_core.csv
@@ -7,4 +7,5 @@ file_format:rc, dataset:tpch, compression_codec:none, compression_type:none
 file_format:avro, dataset:tpch, compression_codec: none, compression_type: none
 file_format:avro, dataset:tpch, compression_codec: snap, compression_type: block
 file_format:parquet, dataset:tpch, compression_codec: none, compression_type: none
+file_format:orc, dataset:tpch, compression_codec: def, compression_type: block
 file_format:kudu, dataset:tpch, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpch/tpch_dimensions.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/tpch_dimensions.csv b/testdata/workloads/tpch/tpch_dimensions.csv
index 1de34aa..f1ce5f0 100644
--- a/testdata/workloads/tpch/tpch_dimensions.csv
+++ b/testdata/workloads/tpch/tpch_dimensions.csv
@@ -1,4 +1,4 @@
-file_format: text,seq,rc,avro,parquet,kudu
+file_format: text,seq,rc,avro,parquet,orc,kudu
 dataset: tpch
 compression_codec: none,def,gzip,bzip,snap,lzo
 compression_type: none,block,record

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpch/tpch_exhaustive.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/tpch_exhaustive.csv b/testdata/workloads/tpch/tpch_exhaustive.csv
index 32085bf..3513dc5 100644
--- a/testdata/workloads/tpch/tpch_exhaustive.csv
+++ b/testdata/workloads/tpch/tpch_exhaustive.csv
@@ -22,4 +22,7 @@ file_format: avro, dataset: tpch, compression_codec: snap, compression_type: blo
 file_format: parquet, dataset: tpch, compression_codec: none, compression_type: none
 file_format: parquet, dataset: tpch, compression_codec: def, compression_type: block
 file_format: parquet, dataset: tpch, compression_codec: snap, compression_type: block
+file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
+file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/testdata/workloads/tpch/tpch_pairwise.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/tpch_pairwise.csv b/testdata/workloads/tpch/tpch_pairwise.csv
index 0744cf5..2eb4176 100644
--- a/testdata/workloads/tpch/tpch_pairwise.csv
+++ b/testdata/workloads/tpch/tpch_pairwise.csv
@@ -13,4 +13,7 @@ file_format: rc, dataset: tpch, compression_codec: def, compression_type: block
 file_format: avro, dataset: tpch, compression_codec: none, compression_type: none
 file_format: parquet, dataset: tpch, compression_codec: none, compression_type: none
 file_format: rc, dataset: tpch, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpch, compression_codec: none, compression_type: none
+file_format: orc, dataset: tpch, compression_codec: def, compression_type: block
+file_format: orc, dataset: tpch, compression_codec: snap, compression_type: block
 file_format: kudu, dataset:tpch, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 28d9c2e..9dcb0ca 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -503,7 +503,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
     Database names are dependent on the input format for table, which the table names
     remaining the same. A use database is issued before query execution. As such,
-    dabase names need to be build pre execution, this method wraps around the different
+    database names need to be build pre execution, this method wraps around the different
     execute methods and provides a common interface to issue the proper use command.
     """
     @wraps(function)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/common/test_dimensions.py
----------------------------------------------------------------------
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 4171e1f..df3f8c2 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -28,7 +28,7 @@ WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']
 # of what specific table format to target along with the exec options (num_nodes, etc)
 # to use when running the query.
 class TableFormatInfo(object):
-  KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'avro', 'hbase']
+  KNOWN_FILE_FORMATS = ['text', 'seq', 'rc', 'parquet', 'orc', 'avro', 'hbase']
   if os.environ['KUDU_IS_SUPPORTED'] == 'true':
     KNOWN_FILE_FORMATS.append('kudu')
   KNOWN_COMPRESSION_CODECS = ['none', 'snap', 'gzip', 'bzip', 'def', 'lzo']

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/common/test_vector.py
----------------------------------------------------------------------
diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py
index 4d22269..0c9cca4 100644
--- a/tests/common/test_vector.py
+++ b/tests/common/test_vector.py
@@ -52,7 +52,7 @@
 # otherwise. For example, if we want to make sure 'bool' columns are not used with 'sum':
 #
 # ImpalaTestMatrix.add_constraint(lambda v:\
-#    not (v.get_value('col_type') == 'bool and v.get_value('agg_func') == 'sum'))
+#    not (v.get_value('col_type') == 'bool' and v.get_value('agg_func') == 'sum'))
 #
 # Additional examples of usage can be found within the test suites.
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/comparison/cli_options.py
----------------------------------------------------------------------
diff --git a/tests/comparison/cli_options.py b/tests/comparison/cli_options.py
index 885ef84..1d737cf 100644
--- a/tests/comparison/cli_options.py
+++ b/tests/comparison/cli_options.py
@@ -221,7 +221,7 @@ def create_cluster(args):
 
 
 def add_storage_format_options(parser):
-  storage_formats = ['avro', 'parquet', 'rcfile', 'sequencefile', 'textfile']
+  storage_formats = ['avro', 'parquet', 'orc', 'rcfile', 'sequencefile', 'textfile']
   parser.add_argument(
       '--storage-file-formats', default=','.join(storage_formats),
       help='A comma separated list of storage formats to use.')

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/query_test/test_chars.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index b182b91..4444410 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -57,6 +57,11 @@ class TestCharFormats(ImpalaTestSuite):
         STORED AS PARQUET
         LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_parquet")))
     self.client.execute('''create external table if not exists
+        functional_orc_def.chars_formats
+        (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
+        STORED AS ORC
+        LOCATION "{0}"'''.format(get_fs_path("/test-warehouse/chars_formats_orc_def")))
+    self.client.execute('''create external table if not exists
         functional.chars_formats
         (cs CHAR(5), cl CHAR(140), vc VARCHAR(32))
         ROW FORMAT delimited fields terminated by ','  escaped by '\\\\'
@@ -84,6 +89,7 @@ class TestCharFormats(ImpalaTestSuite):
         (v.get_value('table_format').file_format in ['avro'] and
         v.get_value('table_format').compression_codec in ['snap']) or
         v.get_value('table_format').file_format in ['parquet'] or
+        v.get_value('table_format').file_format in ['orc'] or
         (v.get_value('table_format').file_format in ['text'] and
         v.get_value('table_format').compression_codec in ['none']))
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/query_test/test_decimal_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_decimal_queries.py b/tests/query_test/test_decimal_queries.py
index 3a14ed3..45a702d 100644
--- a/tests/query_test/test_decimal_queries.py
+++ b/tests/query_test/test_decimal_queries.py
@@ -43,7 +43,7 @@ class TestDecimalQueries(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_constraint(lambda v:\
         (v.get_value('table_format').file_format == 'text' and
          v.get_value('table_format').compression_codec == 'none') or
-         v.get_value('table_format').file_format in ['parquet', 'kudu'])
+         v.get_value('table_format').file_format in ['parquet', 'orc', 'kudu'])
 
   def test_queries(self, vector):
     self.run_test_case('QueryTest/decimal', vector)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 9c893ab..f17561d 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -105,7 +105,7 @@ class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):
     query_template = "select * from alltypes limit %s"
     for i in range(1, iterations):
       # Vary the limit to vary the timing of cancellation
-      limit = (iterations * 100) % 1000 + 1
+      limit = (i * 100) % 1001 + 1
       query = query_template % limit
       result = self.execute_query(query, vector.get_value('exec_option'),
           table_format=vector.get_value('table_format'))
@@ -837,7 +837,7 @@ class TestTextScanRangeLengths(ImpalaTestSuite):
 @SkipIfLocal.hive
 class TestScanTruncatedFiles(ImpalaTestSuite):
   @classmethod
-  def get_workload(self):
+  def get_workload(cls):
     return 'functional-query'
 
   @classmethod
@@ -900,3 +900,101 @@ class TestUncompressedText(ImpalaTestSuite):
     check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
           "/testdata/data/lazy_timestamp.csv", tbl_loc])
     self.run_test_case('QueryTest/select-lazy-timestamp', vector, unique_database)
+
+class TestOrc(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestOrc, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'orc')
+
+  def test_misaligned_orc_stripes(self, vector, unique_database):
+    self._build_lineitem_table_helper(unique_database, 'lineitem_threeblocks',
+        'lineitem_threeblocks.orc')
+    self._build_lineitem_table_helper(unique_database, 'lineitem_sixblocks',
+        'lineitem_sixblocks.orc')
+    self._build_lineitem_table_helper(unique_database,
+        'lineitem_orc_multiblock_one_stripe',
+        'lineitem_orc_multiblock_one_stripe.orc')
+
+    # functional_orc.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
+    # set to 0.
+    table_name = 'functional_orc_def.alltypes'
+    self._misaligned_orc_stripes_helper(table_name, 7300)
+    # lineitem_threeblock.orc is ill-formatted but every scanner reads some stripes.
+    # 'NumScannersWithNoReads' counters are set to 0.
+    table_name = unique_database + '.lineitem_threeblocks'
+    self._misaligned_orc_stripes_helper(table_name, 16000)
+    # lineitem_sixblocks.orc is ill-formatted but every scanner reads some stripes.
+    # 'NumScannersWithNoReads' counters are set to 0.
+    table_name = unique_database + '.lineitem_sixblocks'
+    self._misaligned_orc_stripes_helper(table_name, 30000)
+    # Scanning lineitem_orc_multiblock_one_stripe.orc finds two scan ranges that end up
+    # doing no reads because the file is poorly formatted.
+    table_name = unique_database + '.lineitem_orc_multiblock_one_stripe'
+    self._misaligned_orc_stripes_helper(
+      table_name, 16000, num_scanners_with_no_reads=2)
+
+  def _build_lineitem_table_helper(self, db, tbl, file):
+    self.client.execute("create table %s.%s like tpch.lineitem stored as orc" % (db, tbl))
+    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (db, tbl))
+    # set block size to 156672 so lineitem_threeblocks.orc occupies 3 blocks,
+    # lineitem_sixblocks.orc occupies 6 blocks.
+    check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal',
+        os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, tbl_loc])
+
+  def _misaligned_orc_stripes_helper(
+          self, table_name, rows_in_table, num_scanners_with_no_reads=0):
+    """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners
+    that don't read anything because the underlying file is poorly formatted
+    """
+    query = 'select * from %s' % table_name
+    result = self.client.execute(query)
+    assert len(result.data) == rows_in_table
+
+    runtime_profile = str(result.runtime_profile)
+    num_scanners_with_no_reads_list = re.findall(
+      'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+
+    # This will fail if the number of impalads != 3
+    # The fourth fragment is the "Averaged Fragment"
+    assert len(num_scanners_with_no_reads_list) == 4
+
+    # Calculate the total number of scan ranges that ended up not reading anything because
+    # an underlying file was poorly formatted.
+    # Skip the Averaged Fragment; it comes first in the runtime profile.
+    total = 0
+    for n in num_scanners_with_no_reads_list[1:]:
+      total += int(n)
+    assert total == num_scanners_with_no_reads
+
+  def test_type_conversions(self, vector, unique_database):
+    # Create an "illtypes" table whose columns can't match the underlining ORC file's.
+    # Create an "safetypes" table likes above but ORC columns can still fit into it.
+    # Reuse the data files of functional_orc_def.alltypestiny
+    tbl_loc = get_fs_path("/test-warehouse/alltypestiny_orc_def")
+    self.client.execute("""create external table %s.illtypes (c1 boolean, c2 float,
+        c3 boolean, c4 tinyint, c5 smallint, c6 int, c7 boolean, c8 string, c9 int,
+        c10 float, c11 bigint) partitioned by (year int, month int) stored as ORC
+        location '%s';""" % (unique_database, tbl_loc))
+    self.client.execute("""create external table %s.safetypes (c1 bigint, c2 boolean,
+        c3 smallint, c4 int, c5 bigint, c6 bigint, c7 double, c8 double, c9 char(3),
+        c10 varchar(3), c11 timestamp) partitioned by (year int, month int) stored as ORC
+        location '%s';""" % (unique_database, tbl_loc))
+    self.client.execute("alter table %s.illtypes recover partitions" % unique_database)
+    self.client.execute("alter table %s.safetypes recover partitions" % unique_database)
+
+    # Create a decimal table whose precisions don't match the underlining orc files.
+    # Reuse the data files of functional_orc_def.decimal_tbl.
+    decimal_loc = get_fs_path("/test-warehouse/decimal_tbl_orc_def")
+    self.client.execute("""create external table %s.mismatch_decimals (d1 decimal(8,0),
+        d2 decimal(8,0), d3 decimal(19,10), d4 decimal(20,20), d5 decimal(2,0))
+        partitioned by (d6 decimal(9,0)) stored as orc location '%s'"""
+        % (unique_database, decimal_loc))
+    self.client.execute("alter table %s.mismatch_decimals recover partitions" % unique_database)
+
+    self.run_test_case('DataErrorsTest/orc-type-checks', vector, unique_database)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index c336a17..791c343 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -61,6 +61,8 @@ class TestScannersFuzzing(ImpalaTestSuite):
           'num_nodes' : cls.NUM_NODES_VALUES,
           'mem_limit' : cls.MEM_LIMITS}))
     # TODO: enable for more table formats once they consistently pass the fuzz test.
+    # TODO(IMPALA-6772): enable for ORC formats once a new version after release-1.4.3
+    # of ORC library is released.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ('avro', 'parquet') or
         (v.get_value('table_format').file_format == 'text' and

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/tests/query_test/test_tpch_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_tpch_queries.py b/tests/query_test/test_tpch_queries.py
index ece8347..68a2984 100644
--- a/tests/query_test/test_tpch_queries.py
+++ b/tests/query_test/test_tpch_queries.py
@@ -36,7 +36,7 @@ class TestTpchQuery(ImpalaTestSuite):
     # TODO: the planner tests are based on text and need this.
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:\
-          v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'])
+          v.get_value('table_format').file_format in ['text', 'parquet', 'kudu', 'orc'])
 
   def idfn(val):
     return "TPC-H: Q{0}".format(val)


[11/11] impala git commit: IMPALA-6837: allow multiple networks for distcc server

Posted by bh...@apache.org.
IMPALA-6837: allow multiple networks for distcc server

Testing:
Tested the script on Ubuntu 14.04 and CentOS 6 servers to confirm
that it resulted in a working configuration.

Change-Id: I6d49786b4f6e9c3bfab244e63620a62b2ae4dd64
Reviewed-on: http://gerrit.cloudera.org:8080/10029
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e94dfe47
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e94dfe47
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e94dfe47

Branch: refs/heads/2.x
Commit: e94dfe477d1e23ed754d27130a46a46023f46f60
Parents: 22750d4
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 11 17:30:39 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 bin/distcc/distcc_server_setup.sh | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e94dfe47/bin/distcc/distcc_server_setup.sh
----------------------------------------------------------------------
diff --git a/bin/distcc/distcc_server_setup.sh b/bin/distcc/distcc_server_setup.sh
index 893a82d..8b5e6a9 100755
--- a/bin/distcc/distcc_server_setup.sh
+++ b/bin/distcc/distcc_server_setup.sh
@@ -30,6 +30,10 @@
 #
 #   sudo ./bin/distcc/distcc_server_setup.sh 172.16.0.0/12
 #
+# Multiple networks can be allowed by providing a space-separated list:
+#
+#   sudo ./bin/distcc/distcc_server_setup.sh "172.16.0.0/12 10.16.0.0/8"
+#
 # Environment overrides:
 # ---------------------
 # CCACHE_DIR: directory to use for distccd's ccache.
@@ -97,9 +101,13 @@ export CCACHE_DIR=${CCACHE_DIR}
 # END: Settings automatically generated by distcc_server_setup.sh
 EOF
 else
+  ALLOWED_NETS_ARGS=
+  for allowed_net in $ALLOWED_NETS; do
+    ALLOWED_NETS_ARGS+=" --allow ${allowed_net}"
+  done
   cat << EOF >> /etc/sysconfig/distccd
 # BEGIN: Settings automatically generated by distcc_server_setup.sh
-OPTIONS="--jobs $(($(nproc) * 2)) --allow ${ALLOWED_NETS} --log-level=warn --nice=-15"
+OPTIONS="--jobs $(($(nproc) * 2)) ${ALLOWED_NETS_ARGS} --log-level=warn --nice=-15"
 # CCACHE_DIR is picked up by ccache from environment. CentOS 6 requires CCACHE_DIR to
 # be exported while CentOS 7 seems to ignore the "export VAR=val" syntax.
 CCACHE_DIR=${CCACHE_DIR}


[10/11] impala git commit: Bump Kudu version to a954418

Posted by bh...@apache.org.
Bump Kudu version to a954418

Change-Id: Ib06c0fb3c24a8cee1dd4f34a221cf41a711a5359
Reviewed-on: http://gerrit.cloudera.org:8080/9982
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e3bdc3e2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e3bdc3e2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e3bdc3e2

Branch: refs/heads/2.x
Commit: e3bdc3e247ab3a25a01c276eafefe5e895f97fb6
Parents: 371107a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Apr 10 23:11:27 2018 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e3bdc3e2/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index f113e53..6dff3d7 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -79,7 +79,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=88-8e37339d45
+export IMPALA_TOOLCHAIN_BUILD_ID=102-02a8e245df
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -167,7 +167,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=0eef8e0
+export IMPALA_KUDU_VERSION=a954418
 unset IMPALA_KUDU_URL
 
 # Kudu version used to identify Java client jar from maven


[02/11] impala git commit: IMPALA-5717: Support for reading ORC data files

Posted by bh...@apache.org.
IMPALA-5717: Support for reading ORC data files

This patch integrates the orc library into Impala and implements
HdfsOrcScanner as a middle layer between them. The HdfsOrcScanner
supplies input needed from the orc-reader, tracks memory consumption of
the reader and transfers the reader's output (orc::ColumnVectorBatch)
into impala::RowBatch. The ORC version we used is release-1.4.3.

A startup option --enable_orc_scanner is added for this feature. It's
set to true by default. Setting it to false will fail queries on ORC
tables.

Currently, we only support reading primitive types. Writing into ORC
table has not been supported neither.

Tests
 - Most of the end-to-end tests can run on ORC format.
 - Add tpcds, tpch tests for ORC.
 - Add some ORC specific tests.
 - Haven't enabled test_scanner_fuzz for ORC yet, since the ORC library
   is not robust for corrupt files (ORC-315).

Change-Id: Ia7b6ae4ce3b9ee8125b21993702faa87537790a4
Reviewed-on: http://gerrit.cloudera.org:8080/9134
Reviewed-by: Quanlong Huang <hu...@gmail.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9988


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/23743baa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/23743baa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/23743baa

Branch: refs/heads/2.x
Commit: 23743baa87863037fd21fffbaf031990aef131a7
Parents: 83422a3
Author: stiga-huang <hu...@gmail.com>
Authored: Thu Jan 25 06:39:25 2018 -0800
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Fri Apr 13 03:25:23 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   5 +
 be/CMakeLists.txt                               |   2 +
 be/src/codegen/gen_ir_descriptions.py           |   4 +-
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/hdfs-orc-scanner.cc                 | 763 +++++++++++++++++++
 be/src/exec/hdfs-orc-scanner.h                  | 224 ++++++
 be/src/exec/hdfs-parquet-scanner-ir.cc          |  14 -
 be/src/exec/hdfs-parquet-scanner.cc             | 185 +----
 be/src/exec/hdfs-parquet-scanner.h              |  65 --
 be/src/exec/hdfs-scan-node-base.cc              |   6 +
 be/src/exec/hdfs-scan-node-mt.cc                |   1 +
 be/src/exec/hdfs-scanner-ir.cc                  |  14 +
 be/src/exec/hdfs-scanner.cc                     | 179 +++++
 be/src/exec/hdfs-scanner.h                      |  76 ++
 be/src/util/backend-gflag-util.cc               |   2 +
 bin/bootstrap_toolchain.py                      |   2 +-
 bin/impala-config.sh                            |   2 +
 cmake_modules/FindOrc.cmake                     |  55 ++
 common/thrift/BackendGflags.thrift              |   2 +
 common/thrift/CatalogObjects.thrift             |   6 +-
 fe/src/main/cup/sql-parser.cup                  |   8 +-
 .../impala/analysis/ComputeStatsStmt.java       |   8 +-
 .../apache/impala/catalog/HdfsFileFormat.java   |  38 +-
 .../impala/catalog/HdfsStorageDescriptor.java   |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java |   8 +-
 .../apache/impala/service/BackendConfig.java    |   2 +
 .../org/apache/impala/service/Frontend.java     |   4 +-
 fe/src/main/jflex/sql-scanner.flex              |   1 +
 testdata/LineItemMultiBlock/README.dox          |  19 +
 .../lineitem_orc_multiblock_one_stripe.orc      | Bin 0 -> 424277 bytes
 .../LineItemMultiBlock/lineitem_sixblocks.orc   | Bin 0 -> 863581 bytes
 .../LineItemMultiBlock/lineitem_threeblocks.orc | Bin 0 -> 465929 bytes
 testdata/bin/create-load-data.sh                |   3 +
 testdata/bin/generate-schema-statements.py      |   7 +-
 testdata/bin/run-hive-server.sh                 |   5 +-
 .../common/etc/hadoop/conf/hdfs-site.xml.tmpl   |   6 +
 testdata/data/chars-formats.orc                 | Bin 0 -> 1411 bytes
 .../functional/functional_schema_template.sql   |   2 +
 .../datasets/functional/schema_constraints.csv  |   3 +
 .../PlannerTest/complex-types-file-formats.test |  57 +-
 .../functional-query/functional-query_core.csv  |   1 +
 .../functional-query_dimensions.csv             |   2 +-
 .../functional-query_exhaustive.csv             |   1 +
 .../functional-query_pairwise.csv               |   1 +
 .../queries/DataErrorsTest/orc-type-checks.test | 127 +++
 testdata/workloads/tpcds/tpcds_core.csv         |   1 +
 testdata/workloads/tpcds/tpcds_dimensions.csv   |   2 +-
 testdata/workloads/tpcds/tpcds_exhaustive.csv   |   3 +
 testdata/workloads/tpcds/tpcds_pairwise.csv     |   3 +
 testdata/workloads/tpch/tpch_core.csv           |   1 +
 testdata/workloads/tpch/tpch_dimensions.csv     |   2 +-
 testdata/workloads/tpch/tpch_exhaustive.csv     |   3 +
 testdata/workloads/tpch/tpch_pairwise.csv       |   3 +
 tests/common/impala_test_suite.py               |   2 +-
 tests/common/test_dimensions.py                 |   2 +-
 tests/common/test_vector.py                     |   2 +-
 tests/comparison/cli_options.py                 |   2 +-
 tests/query_test/test_chars.py                  |   6 +
 tests/query_test/test_decimal_queries.py        |   2 +-
 tests/query_test/test_scanners.py               | 102 ++-
 tests/query_test/test_scanners_fuzz.py          |   2 +
 tests/query_test/test_tpch_queries.py           |   2 +-
 62 files changed, 1745 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 612e00c..43cd258 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -76,6 +76,7 @@ endfunction()
 # set_dep_root(PACKAGE) ->
 #   PACKAGE_ROOT set to $ENV{IMPALA_TOOLCHAIN}/PACKAGE-$ENV{IMPALA_PACKAGE_VERSION}
 set_dep_root(AVRO)
+set_dep_root(ORC)
 set_dep_root(BOOST)
 set_dep_root(BREAKPAD)
 set_dep_root(BZIP2)
@@ -272,6 +273,10 @@ message(STATUS "RapidJson include dir: " ${RAPIDJSON_INCLUDE_DIR})
 find_package(Avro REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(avro ${AVRO_INCLUDE_DIR} ${AVRO_STATIC_LIB} "")
 
+# find ORC headers and libs
+find_package(Orc REQUIRED)
+IMPALA_ADD_THIRDPARTY_LIB(orc ${ORC_INCLUDE_DIR} ${ORC_STATIC_LIB} "")
+
 # find protobuf headers, libs and compiler
 find_package(Protobuf REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(protobuf ${PROTOBUF_INCLUDE_DIR} ${PROTOBUF_STATIC_LIBRARY}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index b6e10b0..8e4f8bd 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -295,6 +295,7 @@ set(CLANG_INCLUDE_FLAGS
   "-I${GFLAGS_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
+  "-I${ORC_INCLUDE_DIR}"
   # Include Boost as a system directory to suppress warnings from headers.
   "-isystem${BOOST_INCLUDEDIR}"
   # Required so that jni.h can be found during Clang compilation
@@ -447,6 +448,7 @@ set (IMPALA_DEPENDENCIES
   zlib
   bzip2
   avro
+  orc
   java_jvm
   kudu_client)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 1d0f38e..26a8ad7 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -177,8 +177,8 @@ ir_functions = [
    "_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEPNS_13FieldLocationEiiiib"],
   ["PROCESS_SCRATCH_BATCH",
    "_ZN6impala18HdfsParquetScanner19ProcessScratchBatchEPNS_8RowBatchE"],
-  ["PARQUET_SCANNER_EVAL_RUNTIME_FILTER",
-   "_ZN6impala18HdfsParquetScanner17EvalRuntimeFilterEiPNS_8TupleRowE"],
+  ["HDFS_SCANNER_EVAL_RUNTIME_FILTER",
+   "_ZN6impala11HdfsScanner17EvalRuntimeFilterEiPNS_8TupleRowE"],
   ["STRING_TO_BOOL", "IrStringToBool"],
   ["STRING_TO_INT8", "IrStringToInt8"],
   ["STRING_TO_INT16", "IrStringToInt16"],

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index ddd84ee..7224df8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -59,6 +59,7 @@ add_library(Exec
   hdfs-parquet-scanner.cc
   hdfs-parquet-scanner-ir.cc
   hdfs-parquet-table-writer.cc
+  hdfs-orc-scanner.cc
   hbase-scan-node.cc
   hbase-table-scanner.cc
   incr-stats-util.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-orc-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
new file mode 100644
index 0000000..3660600
--- /dev/null
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -0,0 +1,763 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-orc-scanner.h"
+
+#include <queue>
+
+#include "exec/scanner-context.inline.h"
+#include "exprs/expr.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/tuple-row.h"
+#include "util/decompress.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+DEFINE_bool(enable_orc_scanner, true,
+    "If false, reading from ORC format tables is not supported");
+
+Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+    const vector<HdfsFileDesc*>& files) {
+  for (HdfsFileDesc* file : files) {
+    // If the file size is less than 10 bytes, it is an invalid ORC file.
+    if (file->file_length < 10) {
+      return Status(Substitute("ORC file $0 has an invalid file length: $1",
+          file->filename, file->file_length));
+    }
+  }
+  return IssueFooterRanges(scan_node, THdfsFileFormat::ORC, files);
+}
+
+namespace impala {
+
+HdfsOrcScanner::OrcMemPool::OrcMemPool(HdfsOrcScanner* scanner)
+    : scanner_(scanner), mem_tracker_(scanner_->scan_node_->mem_tracker()) {
+}
+
+HdfsOrcScanner::OrcMemPool::~OrcMemPool() {
+  FreeAll();
+}
+
+void HdfsOrcScanner::OrcMemPool::FreeAll() {
+  int64_t total_bytes_released = 0;
+  for (auto it = chunk_sizes_.begin(); it != chunk_sizes_.end(); ++it) {
+    std::free(it->first);
+    total_bytes_released += it->second;
+  }
+  mem_tracker_->Release(total_bytes_released);
+  chunk_sizes_.clear();
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-total_bytes_released);
+}
+
+// orc-reader will not check the malloc result. We throw an exception if we can't
+// malloc to stop the orc-reader.
+char* HdfsOrcScanner::OrcMemPool::malloc(uint64_t size) {
+  if (!mem_tracker_->TryConsume(size)) {
+    throw ResourceError(mem_tracker_->MemLimitExceeded(
+        scanner_->state_, "Failed to allocate memory required by ORC library", size));
+  }
+  char* addr = static_cast<char*>(std::malloc(size));
+  if (addr == nullptr) {
+    mem_tracker_->Release(size);
+    throw ResourceError(Status(TErrorCode::MEM_ALLOC_FAILED, size));
+  }
+  chunk_sizes_[addr] = size;
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(size);
+  return addr;
+}
+
+void HdfsOrcScanner::OrcMemPool::free(char* p) {
+  DCHECK(chunk_sizes_.find(p) != chunk_sizes_.end()) << "invalid free!" << endl
+       << GetStackTrace();
+  std::free(p);
+  int64_t size = chunk_sizes_[p];
+  mem_tracker_->Release(size);
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-size);
+  chunk_sizes_.erase(p);
+}
+
+// TODO: improve this to use async IO (IMPALA-6636).
+void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
+    uint64_t offset) {
+  const ScanRange* metadata_range = scanner_->metadata_range_;
+  const ScanRange* split_range =
+      reinterpret_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  int64_t partition_id = scanner_->context_->partition_descriptor()->id();
+
+  // Set expected_local to false to avoid cache on stale data (IMPALA-6830)
+  bool expected_local = false;
+  ScanRange* range = scanner_->scan_node_->AllocateScanRange(
+      metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
+      split_range->disk_id(), expected_local,
+      BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length));
+
+  unique_ptr<BufferDescriptor> io_buffer;
+  Status status;
+  {
+    SCOPED_TIMER(scanner_->state_->total_storage_wait_timer());
+    status = scanner_->state_->io_mgr()->Read(
+        scanner_->scan_node_->reader_context(), range, &io_buffer);
+  }
+  if (io_buffer != nullptr) scanner_->state_->io_mgr()->ReturnBuffer(move(io_buffer));
+  if (!status.ok()) throw ResourceError(status);
+}
+
+HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
+  : HdfsScanner(scan_node, state),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
+  assemble_rows_timer_.Stop();
+}
+
+HdfsOrcScanner::~HdfsOrcScanner() {
+}
+
+Status HdfsOrcScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  metadata_range_ = stream_->scan_range();
+  num_cols_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcColumns", TUnit::UNIT);
+  num_stripes_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcStripes", TUnit::UNIT);
+  num_scanners_with_no_reads_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
+  process_footer_timer_stats_ =
+      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "OrcFooterProcessingTime");
+  scan_node_->IncNumScannersCodegenDisabled();
+
+  DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
+  for (const FilterContext& ctx : context->filter_ctxs()) {
+    DCHECK(ctx.filter != nullptr);
+    filter_ctxs_.push_back(&ctx);
+  }
+  filter_stats_.resize(filter_ctxs_.size());
+  reader_mem_pool_.reset(new OrcMemPool(this));
+  reader_options_.setMemoryPool(*reader_mem_pool_);
+
+  // Each scan node can process multiple splits. Each split processes the footer once.
+  // We use a timer to measure the time taken to ProcessFileTail() per split and add
+  // this time to the averaged timer.
+  MonotonicStopWatch single_footer_process_timer;
+  single_footer_process_timer.Start();
+  // First process the file metadata in the footer.
+  Status footer_status = ProcessFileTail();
+  single_footer_process_timer.Stop();
+  process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
+
+  // Release I/O buffers immediately to make sure they are cleaned up
+  // in case we return a non-OK status anywhere below.
+  context_->ReleaseCompletedResources(true);
+  RETURN_IF_ERROR(footer_status);
+
+  // Update orc reader options base on the tuple descriptor
+  RETURN_IF_ERROR(SelectColumns(scan_node_->tuple_desc()));
+
+  // Set top-level template tuple.
+  template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+  return Status::OK();
+}
+
+void HdfsOrcScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
+  if (row_batch != nullptr) {
+    context_->ReleaseCompletedResources(true);
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          unique_ptr<RowBatch>(row_batch));
+    }
+  } else {
+    template_tuple_pool_->FreeAll();
+    context_->ReleaseCompletedResources(true);
+  }
+  scratch_batch_.reset(nullptr);
+
+  // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+
+  assemble_rows_timer_.Stop();
+  assemble_rows_timer_.ReleaseCounter();
+
+  THdfsCompression::type compression_type = THdfsCompression::NONE;
+  if (reader_ != nullptr) {
+    compression_type = TranslateCompressionKind(reader_->getCompression());
+  }
+  scan_node_->RangeComplete(THdfsFileFormat::ORC, compression_type);
+
+  for (int i = 0; i < filter_ctxs_.size(); ++i) {
+    const FilterStats* stats = filter_ctxs_[i]->stats;
+    const LocalFilterStats& local = filter_stats_[i];
+    stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
+        local.considered, local.rejected);
+  }
+  CloseInternal();
+}
+
+Status HdfsOrcScanner::ProcessFileTail() {
+  unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
+  VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
+      << ", length: " << input_stream->getLength();
+  try {
+    reader_ = orc::createReader(move(input_stream), reader_options_);
+  } catch (ResourceError& e) {  // errors throw from the orc scanner
+    parse_status_ = e.GetStatus();
+    return parse_status_;
+  } catch (std::exception& e) { // other errors throw from the orc library
+    string msg = Substitute("Encountered parse error in tail of ORC file $0: $1",
+        filename(), e.what());
+    parse_status_ = Status(msg);
+    return parse_status_;
+  }
+
+  if (reader_->getNumberOfRows() == 0)  return Status::OK();
+  if (reader_->getNumberOfStripes() == 0) {
+    return Status(Substitute("Invalid ORC file: $0. No stripes in this file but"
+        " numberOfRows in footer is $1", filename(), reader_->getNumberOfRows()));
+  }
+  return Status::OK();
+}
+
+inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
+    orc::CompressionKind kind) {
+  switch (kind) {
+    case orc::CompressionKind::CompressionKind_NONE: return THdfsCompression::NONE;
+    // zlib used in ORC is corresponding to Deflate in Impala
+    case orc::CompressionKind::CompressionKind_ZLIB: return THdfsCompression::DEFLATE;
+    case orc::CompressionKind::CompressionKind_SNAPPY: return THdfsCompression::SNAPPY;
+    case orc::CompressionKind::CompressionKind_LZO: return THdfsCompression::LZO;
+    case orc::CompressionKind::CompressionKind_LZ4: return THdfsCompression::LZ4;
+    case orc::CompressionKind::CompressionKind_ZSTD: return THdfsCompression::ZSTD;
+    default:
+      VLOG_QUERY << "Unknown compression kind of orc::CompressionKind: " << kind;
+  }
+  return THdfsCompression::DEFAULT;
+}
+
+Status HdfsOrcScanner::SelectColumns(const TupleDescriptor* tuple_desc) {
+  list<uint64_t> selected_indices;
+  int num_columns = 0;
+  const orc::Type& root_type = reader_->getType();
+  // TODO validate columns. e.g. scale of decimal type
+  for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
+    // Skip partition columns
+    if (slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
+
+    const SchemaPath &path = slot_desc->col_path();
+    DCHECK_EQ(path.size(), 1);
+    int col_idx = path[0];
+    // The first index in a path includes the table's partition keys
+    int col_idx_in_file = col_idx - scan_node_->num_partition_keys();
+    if (col_idx_in_file >= root_type.getSubtypeCount()) {
+      // In this case, we are selecting a column that is not in the file.
+      // Update the template tuple to put a NULL in this slot.
+      Tuple** template_tuple = &template_tuple_map_[tuple_desc];
+      if (*template_tuple == nullptr) {
+        *template_tuple =
+            Tuple::Create(tuple_desc->byte_size(), template_tuple_pool_.get());
+      }
+      (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+      continue;
+    }
+    selected_indices.push_back(col_idx_in_file);
+    const orc::Type* orc_type = root_type.getSubtype(col_idx_in_file);
+    const ColumnType& col_type = scan_node_->hdfs_table()->col_descs()[col_idx].type();
+    // TODO(IMPALA-6503): Support reading complex types from ORC format files
+    DCHECK(!col_type.IsComplexType()) << "Complex types are not supported yet";
+    RETURN_IF_ERROR(ValidateType(col_type, *orc_type));
+    col_id_slot_map_[orc_type->getColumnId()] = slot_desc;
+    ++num_columns;
+  }
+  COUNTER_SET(num_cols_counter_, static_cast<int64_t>(num_columns));
+  row_reader_options.include(selected_indices);
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::ValidateType(const ColumnType& type, const orc::Type& orc_type) {
+  switch (orc_type.getKind()) {
+    case orc::TypeKind::BOOLEAN:
+      if (type.type == TYPE_BOOLEAN) return Status::OK();
+      break;
+    case orc::TypeKind::BYTE:
+      if (type.type == TYPE_TINYINT || type.type == TYPE_SMALLINT
+          || type.type == TYPE_INT || type.type == TYPE_BIGINT)
+        return Status::OK();
+      break;
+    case orc::TypeKind::SHORT:
+      if (type.type == TYPE_SMALLINT || type.type == TYPE_INT
+          || type.type == TYPE_BIGINT)
+        return Status::OK();
+      break;
+    case orc::TypeKind::INT:
+      if (type.type == TYPE_INT || type.type == TYPE_BIGINT) return Status::OK();
+      break;
+    case orc::TypeKind::LONG:
+      if (type.type == TYPE_BIGINT) return Status::OK();
+      break;
+    case orc::TypeKind::FLOAT:
+    case orc::TypeKind::DOUBLE:
+      if (type.type == TYPE_FLOAT || type.type == TYPE_DOUBLE) return Status::OK();
+      break;
+    case orc::TypeKind::STRING:
+    case orc::TypeKind::VARCHAR:
+    case orc::TypeKind::CHAR:
+      if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR
+          || type.type == TYPE_CHAR)
+        return Status::OK();
+      break;
+    case orc::TypeKind::TIMESTAMP:
+      if (type.type == TYPE_TIMESTAMP) return Status::OK();
+      break;
+    case orc::TypeKind::DECIMAL: {
+      if (type.type != TYPE_DECIMAL || type.scale != orc_type.getScale()) break;
+      bool overflow = false;
+      int orc_precision = orc_type.getPrecision();
+      if (orc_precision == 0 || orc_precision > ColumnType::MAX_DECIMAL8_PRECISION) {
+        // For ORC decimals whose precision is larger than 18, its value can't fit into
+        // an int64 (10^19 > 2^63). So we should use int128 (16 bytes) for this case.
+        // The possible byte sizes for Impala decimals are 4, 8, 16.
+        // We mark it as overflow if the target byte size is not 16.
+        overflow = (type.GetByteSize() != 16);
+      } else if (orc_type.getPrecision() > ColumnType::MAX_DECIMAL4_PRECISION) {
+        // For ORC decimals whose precision <= 18 and > 9, int64 and int128 can fit them.
+        // We only mark it as overflow if the target byte size is 4.
+        overflow = (type.GetByteSize() == 4);
+      }
+      if (!overflow) return Status::OK();
+      return Status(Substitute(
+          "It can't be truncated to table column $2 for column $0 in ORC file '$1'",
+          orc_type.toString(), filename(), type.DebugString()));
+    }
+    default: break;
+  }
+  return Status(Substitute(
+      "Type mismatch: table column $0 is map to column $1 in ORC file '$2'",
+      type.DebugString(), orc_type.toString(), filename()));
+}
+
+Status HdfsOrcScanner::ProcessSplit() {
+  DCHECK(scan_node_->HasRowBatchQueue());
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+  do {
+    unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
+        state_->batch_size(), scan_node_->mem_tracker());
+    Status status = GetNextInternal(batch.get());
+    // Always add batch to the queue because it may contain data referenced by previously
+    // appended batches.
+    scan_node->AddMaterializedRowBatch(move(batch));
+    RETURN_IF_ERROR(status);
+    ++row_batches_produced_;
+    if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
+      CheckFiltersEffectiveness();
+    }
+  } while (!eos_ && !scan_node_->ReachedLimit());
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
+  if (scan_node_->IsZeroSlotTableScan()) {
+    uint64_t file_rows = reader_->getNumberOfRows();
+    // There are no materialized slots, e.g. count(*) over the table.  We can serve
+    // this query from just the file metadata.  We don't need to read the column data.
+    if (stripe_rows_read_ == file_rows) {
+      eos_ = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    DCHECK_LT(stripe_rows_read_, file_rows);
+    int64_t rows_remaining = file_rows - stripe_rows_read_;
+    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+    Status status = CommitRows(num_to_commit, row_batch);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    stripe_rows_read_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit);
+    return Status::OK();
+  }
+
+  // reset tuple memory. We'll allocate it the first time we use it.
+  tuple_mem_ = nullptr;
+  tuple_ = nullptr;
+
+  // Transfer remaining tuples from the scratch batch.
+  if (ScratchBatchNotEmpty()) {
+    assemble_rows_timer_.Start();
+    RETURN_IF_ERROR(TransferScratchTuples(row_batch));
+    assemble_rows_timer_.Stop();
+    if (row_batch->AtCapacity()) return Status::OK();
+    DCHECK_EQ(scratch_batch_tuple_idx_, scratch_batch_->numElements);
+  }
+
+  while (advance_stripe_ || end_of_stripe_) {
+    context_->ReleaseCompletedResources(/* done */ true);
+    // Commit the rows to flush the row batch from the previous stripe
+    RETURN_IF_ERROR(CommitRows(0, row_batch));
+
+    RETURN_IF_ERROR(NextStripe());
+    DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes());
+    if (stripe_idx_ == reader_->getNumberOfStripes()) {
+      eos_ = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
+  }
+
+  // Apply any runtime filters to static tuples containing the partition keys for this
+  // partition. If any filter fails, we return immediately and stop processing this
+  // scan range.
+  if (!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+    eos_ = true;
+    DCHECK(parse_status_.ok());
+    return Status::OK();
+  }
+  assemble_rows_timer_.Start();
+  Status status = AssembleRows(row_batch);
+  assemble_rows_timer_.Stop();
+  RETURN_IF_ERROR(status);
+  if (!parse_status_.ok()) {
+    RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    parse_status_ = Status::OK();
+  }
+  return Status::OK();
+}
+
+inline bool HdfsOrcScanner::ScratchBatchNotEmpty() {
+  return scratch_batch_ != nullptr
+      && scratch_batch_tuple_idx_ < scratch_batch_->numElements;
+}
+
+inline static bool CheckStripeOverlapsSplit(int64_t stripe_start, int64_t stripe_end,
+    int64_t split_start, int64_t split_end) {
+  return (split_start >= stripe_start && split_start < stripe_end) ||
+      (split_end > stripe_start && split_end <= stripe_end) ||
+      (split_start <= stripe_start && split_end >= stripe_end);
+}
+
+Status HdfsOrcScanner::NextStripe() {
+  const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+      metadata_range_->meta_data())->original_split;
+  int64_t split_offset = split_range->offset();
+  int64_t split_length = split_range->len();
+
+  bool start_with_first_stripe = stripe_idx_ == -1;
+  bool misaligned_stripe_skipped = false;
+
+  advance_stripe_ = false;
+  stripe_rows_read_ = 0;
+
+  // Loop until we have found a non-empty stripe.
+  while (true) {
+    // Reset the parse status for the next stripe.
+    parse_status_ = Status::OK();
+
+    ++stripe_idx_;
+    if (stripe_idx_ >= reader_->getNumberOfStripes()) {
+      if (start_with_first_stripe && misaligned_stripe_skipped) {
+        // We started with the first stripe and skipped all the stripes because they were
+        // misaligned. The execution flow won't reach this point if there is at least one
+        // non-empty stripe which this scanner can process.
+        COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+      }
+      break;
+    }
+    unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(stripe_idx_);
+    // Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 'select *'
+    // behave consistently for corrupt files that have 'footer_.numberOfRows == 0'
+    // but some data in stripe.
+    if (stripe->getNumberOfRows() == 0 || reader_->getNumberOfRows() == 0) continue;
+
+    uint64_t stripe_offset = stripe->getOffset();
+    uint64_t stripe_len = stripe->getIndexLength() + stripe->getDataLength() +
+        stripe->getFooterLength();
+    int64_t stripe_mid_pos = stripe_offset + stripe_len / 2;
+    if (!(stripe_mid_pos >= split_offset &&
+        stripe_mid_pos < split_offset + split_length)) {
+      // Middle pos not in split, this stripe will be handled by a different scanner.
+      // Mark if the stripe overlaps with the split.
+      misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset,
+          stripe_offset + stripe_len, split_offset, split_offset + split_length);
+      continue;
+    }
+
+    // TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505
+
+    COUNTER_ADD(num_stripes_counter_, 1);
+    row_reader_options.range(stripe->getOffset(), stripe_len);
+    try {
+      row_reader_ = reader_->createRowReader(row_reader_options);
+    } catch (ResourceError& e) {  // errors throw from the orc scanner
+      parse_status_ = e.GetStatus();
+      return parse_status_;
+    } catch (std::exception& e) { // errors throw from the orc library
+      VLOG_QUERY << "Error in creating ORC column readers: " << e.what();
+      parse_status_ = Status(
+          Substitute("Error in creating ORC column readers: $0.", e.what()));
+      return parse_status_;
+    }
+    end_of_stripe_ = false;
+    VLOG_ROW << Substitute("Created RowReader for stripe(offset=$0, len=$1) in file $2",
+        stripe->getOffset(), stripe_len, filename());
+    break;
+  }
+
+  DCHECK(parse_status_.ok());
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
+  bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled();
+  if (!continue_execution)  return Status::CANCELLED;
+
+  scratch_batch_tuple_idx_ = 0;
+  scratch_batch_ = row_reader_->createRowBatch(row_batch->capacity());
+  DCHECK_EQ(scratch_batch_->numElements, 0);
+
+  int64_t num_rows_read = 0;
+  while (continue_execution) {  // one ORC scratch batch (ColumnVectorBatch) in a round
+    if (scratch_batch_tuple_idx_ == scratch_batch_->numElements) {
+      try {
+        if (!row_reader_->next(*scratch_batch_)) {
+          end_of_stripe_ = true;
+          break; // no more data to process
+        }
+      } catch (ResourceError& e) {
+        parse_status_ = e.GetStatus();
+        return parse_status_;
+      } catch (std::exception& e) {
+        VLOG_QUERY << "Encounter parse error: " << e.what();
+        parse_status_ = Status(Substitute("Encounter parse error: $0.", e.what()));
+        eos_ = true;
+        return parse_status_;
+      }
+      if (scratch_batch_->numElements == 0) {
+        RETURN_IF_ERROR(CommitRows(0, row_batch));
+        end_of_stripe_ = true;
+        return Status::OK();
+      }
+      num_rows_read += scratch_batch_->numElements;
+      scratch_batch_tuple_idx_ = 0;
+    }
+
+    RETURN_IF_ERROR(TransferScratchTuples(row_batch));
+    if (row_batch->AtCapacity()) break;
+    continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
+  }
+  stripe_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::TransferScratchTuples(RowBatch* dst_batch) {
+  const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
+
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_->data();
+  int num_conjuncts = conjunct_evals_->size();
+
+  const orc::Type* root_type = &row_reader_->getSelectedType();
+  DCHECK_EQ(root_type->getKind(), orc::TypeKind::STRUCT);
+
+  DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
+  if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
+  int row_id = dst_batch->num_rows();
+  int capacity = dst_batch->capacity();
+  int num_to_commit = 0;
+  TupleRow* row = dst_batch->GetRow(row_id);
+  Tuple* tuple = tuple_;  // tuple_ is updated in CommitRows
+
+  // TODO(IMPALA-6506): codegen the runtime filter + conjunct evaluation loop
+  // TODO: transfer the scratch_batch_ column-by-column for batch, and then evaluate
+  // the predicates in later loop.
+  while (row_id < capacity && ScratchBatchNotEmpty()) {
+    DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
+    InitTuple(tuple_desc, template_tuple_, tuple);
+    RETURN_IF_ERROR(ReadRow(static_cast<const orc::StructVectorBatch&>(*scratch_batch_),
+        scratch_batch_tuple_idx_++, root_type, tuple, dst_batch));
+    row->SetTuple(scan_node_->tuple_idx(), tuple);
+    if (!EvalRuntimeFilters(row)) continue;
+    if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, row)) {
+      row = next_row(row);
+      tuple = next_tuple(tuple_desc->byte_size(), tuple);
+      ++row_id;
+      ++num_to_commit;
+    }
+  }
+  VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
+      num_to_commit, dst_batch->num_rows());
+  return CommitRows(num_to_commit, dst_batch);
+}
+
+Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
+  int64_t tuple_buffer_size;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
+  tuple_mem_end_ = tuple_mem_ + tuple_buffer_size;
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
+  DCHECK_GT(row_batch->capacity(), 0);
+  return Status::OK();
+}
+
+inline Status HdfsOrcScanner::ReadRow(const orc::StructVectorBatch& batch, int row_idx,
+    const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) {
+  for (unsigned int c = 0; c < orc_type->getSubtypeCount(); ++c) {
+    orc::ColumnVectorBatch* col_batch = batch.fields[c];
+    const orc::Type* col_type = orc_type->getSubtype(c);
+    const SlotDescriptor* slot_desc = DCHECK_NOTNULL(
+        col_id_slot_map_[col_type->getColumnId()]);
+    if (col_batch->hasNulls && !col_batch->notNull[row_idx]) {
+      tuple->SetNull(slot_desc->null_indicator_offset());
+      continue;
+    }
+    void* slot_val_ptr = tuple->GetSlot(slot_desc->tuple_offset());
+    switch (col_type->getKind()) {
+      case orc::TypeKind::BOOLEAN: {
+        int64_t val = static_cast<const orc::LongVectorBatch*>(col_batch)->
+            data.data()[row_idx];
+        *(reinterpret_cast<bool*>(slot_val_ptr)) = (val != 0);
+        break;
+      }
+      case orc::TypeKind::BYTE:
+      case orc::TypeKind::SHORT:
+      case orc::TypeKind::INT:
+      case orc::TypeKind::LONG: {
+        const orc::LongVectorBatch* long_batch =
+            static_cast<const orc::LongVectorBatch*>(col_batch);
+        int64_t val = long_batch->data.data()[row_idx];
+        switch (slot_desc->type().type) {
+          case TYPE_TINYINT:
+            *(reinterpret_cast<int8_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_SMALLINT:
+            *(reinterpret_cast<int16_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_INT:
+            *(reinterpret_cast<int32_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_BIGINT:
+            *(reinterpret_cast<int64_t*>(slot_val_ptr)) = val;
+            break;
+          default:
+            DCHECK(false) << "Illegal translation from impala type "
+                << slot_desc->DebugString() << " to orc INT";
+        }
+        break;
+      }
+      case orc::TypeKind::FLOAT:
+      case orc::TypeKind::DOUBLE: {
+        double val =
+            static_cast<const orc::DoubleVectorBatch*>(col_batch)->data.data()[row_idx];
+        if (slot_desc->type().type == TYPE_FLOAT) {
+          *(reinterpret_cast<float*>(slot_val_ptr)) = val;
+        } else {
+          DCHECK_EQ(slot_desc->type().type, TYPE_DOUBLE);
+          *(reinterpret_cast<double*>(slot_val_ptr)) = val;
+        }
+        break;
+      }
+      case orc::TypeKind::STRING:
+      case orc::TypeKind::VARCHAR:
+      case orc::TypeKind::CHAR: {
+        auto str_batch = static_cast<const orc::StringVectorBatch*>(col_batch);
+        const char* src_ptr = str_batch->data.data()[row_idx];
+        int64_t src_len = str_batch->length.data()[row_idx];
+        int dst_len = slot_desc->type().len;
+        if (slot_desc->type().type == TYPE_CHAR) {
+          int unpadded_len = min(dst_len, static_cast<int>(src_len));
+          char* dst_char = reinterpret_cast<char*>(slot_val_ptr);
+          memcpy(dst_char, src_ptr, unpadded_len);
+          StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
+          break;
+        }
+        StringValue* dst = reinterpret_cast<StringValue*>(slot_val_ptr);
+        if (slot_desc->type().type == TYPE_VARCHAR && src_len > dst_len) {
+          dst->len = dst_len;
+        } else {
+          dst->len = src_len;
+        }
+        // Space in the StringVectorBatch is allocated by reader_mem_pool_. It will be
+        // reused at next batch, so we allocate a new space for this string.
+        uint8_t* buffer = dst_batch->tuple_data_pool()->TryAllocate(dst->len);
+        if (buffer == nullptr) {
+          string details = Substitute("Could not allocate string buffer of $0 bytes "
+              "for ORC file '$1'.", dst->len, filename());
+          return scan_node_->mem_tracker()->MemLimitExceeded(
+              state_, details, dst->len);
+        }
+        dst->ptr = reinterpret_cast<char*>(buffer);
+        memcpy(dst->ptr, src_ptr, dst->len);
+        break;
+      }
+      case orc::TypeKind::TIMESTAMP: {
+        const orc::TimestampVectorBatch* ts_batch =
+            static_cast<const orc::TimestampVectorBatch*>(col_batch);
+        int64_t secs = ts_batch->data.data()[row_idx];
+        int64_t nanos = ts_batch->nanoseconds.data()[row_idx];
+        *reinterpret_cast<TimestampValue*>(slot_val_ptr) =
+            TimestampValue::FromUnixTimeNanos(secs, nanos);
+        break;
+      }
+      case orc::TypeKind::DECIMAL: {
+        // For decimals whose precision is larger than 18, its value can't fit into
+        // an int64 (10^19 > 2^63). So we should use int128 for this case.
+        if (col_type->getPrecision() == 0 || col_type->getPrecision() > 18) {
+          auto int128_batch = static_cast<const orc::Decimal128VectorBatch*>(col_batch);
+          orc::Int128 orc_val = int128_batch->values.data()[row_idx];
+
+          DCHECK_EQ(slot_desc->type().GetByteSize(), 16);
+          int128_t val = orc_val.getHighBits();
+          val <<= 64;
+          val |= orc_val.getLowBits();
+          // Use memcpy to avoid gcc generating unaligned instructions like movaps
+          // for int128_t. They will raise SegmentFault when addresses are not
+          // aligned to 16 bytes.
+          memcpy(slot_val_ptr, &val, sizeof(int128_t));
+        } else {
+          // Reminder: even decimal(1,1) is stored in int64 batch
+          auto int64_batch = static_cast<const orc::Decimal64VectorBatch*>(col_batch);
+          int64_t val = int64_batch->values.data()[row_idx];
+
+          switch (slot_desc->type().GetByteSize()) {
+            case 4:
+              reinterpret_cast<Decimal4Value*>(slot_val_ptr)->value() = val;
+              break;
+            case 8:
+              reinterpret_cast<Decimal8Value*>(slot_val_ptr)->value() = val;
+              break;
+            case 16:
+              reinterpret_cast<Decimal16Value*>(slot_val_ptr)->value() = val;
+              break;
+            default: DCHECK(false) << "invalidate byte size";
+          }
+        }
+        break;
+      }
+      case orc::TypeKind::LIST:
+      case orc::TypeKind::MAP:
+      case orc::TypeKind::STRUCT:
+      case orc::TypeKind::UNION:
+      default:
+        DCHECK(false) << slot_desc->type().DebugString() << " map to ORC column "
+            << col_type->toString();
+    }
+  }
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
new file mode 100644
index 0000000..837d92a
--- /dev/null
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_ORC_SCANNER_H
+#define IMPALA_EXEC_HDFS_ORC_SCANNER_H
+
+#include <orc/OrcFile.hh>
+
+#include "runtime/runtime-state.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node.h"
+#include "util/runtime-profile-counters.h"
+
+namespace impala {
+
+struct HdfsFileDesc;
+
+/// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is
+/// transformed into Impala in-memory representation, i.e. Tuples, RowBatches.
+///
+/// For the file format spec, see https://orc.apache.org/docs/spec-intro.html
+class HdfsOrcScanner : public HdfsScanner {
+ public:
+  /// Exception throws from the orc scanner to stop the orc::RowReader. It's used in
+  /// IO errors (e.g. cancellation) or memory errors (e.g. mem_limit exceeded). The
+  /// exact error message will be recorded in parse_status_.
+  class ResourceError : public std::runtime_error {
+   public:
+    explicit ResourceError(const Status& status)
+      : runtime_error(status.msg().msg()), status_(status) {}
+    virtual ~ResourceError() {}
+    Status& GetStatus() { return status_; }
+
+   private:
+    Status status_;
+  };
+
+  class OrcMemPool : public orc::MemoryPool {
+   public:
+    OrcMemPool(HdfsOrcScanner* scanner);
+    virtual ~OrcMemPool();
+
+    char* malloc(uint64_t size) override;
+    void free(char* p) override;
+
+    void FreeAll();
+   private:
+
+    HdfsOrcScanner* scanner_;
+    MemTracker* mem_tracker_;
+    boost::unordered_map<char*, uint64_t> chunk_sizes_;
+  };
+
+  class ScanRangeInputStream : public orc::InputStream {
+   public:
+    ScanRangeInputStream(HdfsOrcScanner* scanner) {
+      this->scanner_ = scanner;
+      this->filename_ = scanner->filename();
+      this->file_desc_ = scanner->scan_node_->GetFileDesc(
+          scanner->context_->partition_descriptor()->id(), filename_);
+    }
+
+    uint64_t getLength() const {
+      return file_desc_->file_length;
+    }
+
+    uint64_t getNaturalReadSize() const {
+      return scanner_->state_->io_mgr()->max_read_buffer_size();
+    }
+
+    void read(void* buf, uint64_t length, uint64_t offset);
+
+    const std::string& getName() const {
+      return filename_;
+    }
+
+  private:
+    HdfsOrcScanner* scanner_;
+    HdfsFileDesc* file_desc_;
+    std::string filename_;
+  };
+
+  HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
+  virtual ~HdfsOrcScanner();
+
+  /// Issue just the footer range for each file.  We'll then parse the footer and pick
+  /// out the columns we want.
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
+      const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT;
+
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+  virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
+  virtual void Close(RowBatch* row_batch) override;
+
+ private:
+  friend class HdfsOrcScannerTest;
+
+  /// Memory guard of the tuple_mem_
+  uint8_t* tuple_mem_end_ = nullptr;
+
+  /// Index of the current stripe being processed. Initialized to -1 which indicates
+  /// that we have not started processing the first stripe yet (GetNext() has not yet
+  /// been called).
+  int32_t stripe_idx_ = -1;
+
+  /// Counts the number of rows processed for the current stripe.
+  int64_t stripe_rows_read_ = 0;
+
+  /// Indicates whether we should advance to the next stripe in the next GetNext().
+  /// Starts out as true to move to the very first stripe.
+  bool advance_stripe_ = true;
+
+  /// Indicates whether we are at the end of a stripe.
+  bool end_of_stripe_ = true;
+
+  /// Number of scratch batches processed so far.
+  int64_t row_batches_produced_ = 0;
+
+  /// Mem pool used in orc readers.
+  boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
+
+  /// orc::Reader's responsibility is to read the footer and metadata from an ORC file.
+  /// It creates orc::RowReader for further materialization. orc::RowReader is used for
+  /// reading rows from the file.
+  std::unique_ptr<orc::Reader> reader_ = nullptr;
+  std::unique_ptr<orc::RowReader> row_reader_ = nullptr;
+
+  /// Orc reader will write slot values into this scratch batch for top-level tuples.
+  /// See AssembleRows().
+  std::unique_ptr<orc::ColumnVectorBatch> scratch_batch_;
+  int scratch_batch_tuple_idx_ = 0;
+
+  /// ReaderOptions used to create orc::Reader.
+  orc::ReaderOptions reader_options_;
+
+  /// RowReaderOptions used to create orc::RowReader.
+  orc::RowReaderOptions row_reader_options;
+
+  /// Column id is the pre order id in orc::Type tree.
+  /// Map from column id to slot descriptor.
+  boost::unordered_map<int, const SlotDescriptor*> col_id_slot_map_;
+
+  /// Scan range for the metadata.
+  const io::ScanRange* metadata_range_ = nullptr;
+
+  /// Timer for materializing rows. This ignores time getting the next buffer.
+  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
+
+  /// Average and min/max time spent processing the footer by each split.
+  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_ = nullptr;
+
+  /// Number of columns that need to be read.
+  RuntimeProfile::Counter* num_cols_counter_ = nullptr;
+
+  /// Number of stripes that need to be read.
+  RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
+
+  /// Number of scanners that end up doing no reads because their splits don't overlap
+  /// with the midpoint of any stripe in the file.
+  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_ = nullptr;
+
+  const char *filename() const { return metadata_range_->file(); }
+
+  virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
+
+  /// Advances 'stripe_idx_' to the next non-empty stripe and initializes
+  /// row_reader_ to scan it.
+  Status NextStripe() WARN_UNUSED_RESULT;
+
+  /// Reads data using orc-reader to materialize instances of 'tuple_desc'.
+  /// Returns a non-OK status if a non-recoverable error was encountered and execution
+  /// of this query should be terminated immediately.
+  Status AssembleRows(RowBatch* row_batch) WARN_UNUSED_RESULT;
+
+  /// Function used by TransferScratchTuples() to read a single row from scratch_batch_
+  /// into 'tuple'.
+  Status ReadRow(const orc::StructVectorBatch& batch, int row_idx,
+      const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) WARN_UNUSED_RESULT;
+
+  /// Evaluates runtime filters and conjuncts (if any) against the tuples in
+  /// 'scratch_batch_', and adds the surviving tuples to the given batch.
+  /// Returns the number of rows that should be committed to the given batch.
+  Status TransferScratchTuples(RowBatch* dst_batch) WARN_UNUSED_RESULT;
+
+  /// Process the file footer and parse file_metadata_.  This should be called with the
+  /// last FOOTER_SIZE bytes in context_.
+  Status ProcessFileTail() WARN_UNUSED_RESULT;
+
+  /// Update reader options used in orc reader by the given tuple descriptor.
+  Status SelectColumns(const TupleDescriptor* tuple_desc) WARN_UNUSED_RESULT;
+
+  /// Validate whether the ColumnType is compatible with the orc type
+  Status ValidateType(const ColumnType& type, const orc::Type& orc_type)
+      WARN_UNUSED_RESULT;
+
+  /// Part of the HdfsScanner interface, not used in Orc.
+  Status InitNewRange() override WARN_UNUSED_RESULT { return Status::OK(); }
+
+  THdfsCompression::type TranslateCompressionKind(orc::CompressionKind kind);
+
+  inline bool ScratchBatchNotEmpty();
+
+  inline Status AllocateTupleMem(RowBatch* row_batch) WARN_UNUSED_RESULT;
+
+};
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc b/be/src/exec/hdfs-parquet-scanner-ir.cc
index f2355d8..2ba56c4 100644
--- a/be/src/exec/hdfs-parquet-scanner-ir.cc
+++ b/be/src/exec/hdfs-parquet-scanner-ir.cc
@@ -65,17 +65,3 @@ int HdfsParquetScanner::ProcessScratchBatch(RowBatch* dst_batch) {
   scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / tuple_size;
   return output_row - output_row_start;
 }
-
-bool HdfsParquetScanner::EvalRuntimeFilter(int i, TupleRow* row) {
-  LocalFilterStats* stats = &filter_stats_[i];
-  const FilterContext* ctx = filter_ctxs_[i];
-  ++stats->total_possible;
-  if (stats->enabled && ctx->filter->HasFilter()) {
-    ++stats->considered;
-    if (!ctx->Eval(row)) {
-      ++stats->rejected;
-      return false;
-    }
-  }
-  return true;
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 0d79f53..ae22149 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -38,15 +38,6 @@ using std::move;
 using namespace impala;
 using namespace impala::io;
 
-DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
-    "rows rejected by a runtime filter drops below this value, the filter is disabled.");
-
-// The number of row batches between checks to see if a filter is effective, and
-// should be disabled. Must be a power of two.
-constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
-static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
-    "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
-
 // Max dictionary page header size in bytes. This is an estimate and only needs to be an
 // upper bound.
 const int MAX_DICT_HEADER_SIZE = 100;
@@ -57,7 +48,6 @@ const int MAX_DICT_HEADER_SIZE = 100;
 // THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
 const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
 
-const int64_t HdfsParquetScanner::FOOTER_SIZE;
 const int16_t HdfsParquetScanner::ROW_GROUP_END;
 const int16_t HdfsParquetScanner::INVALID_LEVEL;
 const int16_t HdfsParquetScanner::INVALID_POS;
@@ -69,71 +59,14 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  vector<ScanRange*> footer_ranges;
-  for (int i = 0; i < files.size(); ++i) {
+  for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
-    if (files[i]->file_length < 12) {
+    if (file->file_length < 12) {
       return Status(Substitute("Parquet file $0 has an invalid file length: $1",
-          files[i]->filename, files[i]->file_length));
-    }
-    // Compute the offset of the file footer.
-    int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
-    int64_t footer_start = files[i]->file_length - footer_size;
-    DCHECK_GE(footer_start, 0);
-
-    // Try to find the split with the footer.
-    ScanRange* footer_split = FindFooterSplit(files[i]);
-
-    for (int j = 0; j < files[i]->splits.size(); ++j) {
-      ScanRange* split = files[i]->splits[j];
-
-      DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
-      // If there are no materialized slots (such as count(*) over the table), we can
-      // get the result with the file metadata alone and don't need to read any row
-      // groups. We only want a single node to process the file footer in this case,
-      // which is the node with the footer split.  If it's not a count(*), we create a
-      // footer range for the split always.
-      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
-        ScanRangeMetadata* split_metadata =
-            static_cast<ScanRangeMetadata*>(split->meta_data());
-        // Each split is processed by first issuing a scan range for the file footer, which
-        // is done here, followed by scan ranges for the columns of each row group within
-        // the actual split (in InitColumns()). The original split is stored in the
-        // metadata associated with the footer range.
-        ScanRange* footer_range;
-        if (footer_split != nullptr) {
-          footer_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), footer_size, footer_start,
-              split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(),
-              BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
-        } else {
-          // If we did not find the last split, we know it is going to be a remote read.
-          footer_range =
-              scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
-                  footer_size, footer_start, split_metadata->partition_id, -1, false,
-                  BufferOpts::Uncached(), split);
-        }
-
-        footer_ranges.push_back(footer_range);
-      } else {
-        scan_node->RangeComplete(THdfsFileFormat::PARQUET, THdfsCompression::NONE);
-      }
+          file->filename, file->file_length));
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all the files
-  // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
-  return Status::OK();
-}
-
-ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
-  DCHECK(file != nullptr);
-  for (int i = 0; i < file->splits.size(); ++i) {
-    ScanRange* split = file->splits[i];
-    if (split->offset() + split->len() == file->file_length) return split;
-  }
-  return nullptr;
+  return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
 }
 
 namespace impala {
@@ -379,18 +312,6 @@ int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& c
   return num_columns;
 }
 
-void HdfsParquetScanner::CheckFiltersEffectiveness() {
-  for (int i = 0; i < filter_stats_.size(); ++i) {
-    LocalFilterStats* stats = &filter_stats_[i];
-    const RuntimeFilter* filter = filter_ctxs_[i]->filter;
-    double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
-    if (filter->AlwaysTrue() ||
-        reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
-      stats->enabled = 0;
-    }
-  }
-}
-
 Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
@@ -1126,104 +1047,6 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
   return Status::OK();
 }
 
-bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {
-  int num_filters = filter_ctxs_.size();
-  for (int i = 0; i < num_filters; ++i) {
-    if (!EvalRuntimeFilter(i, row)) return false;
-  }
-  return true;
-}
-
-// ; Function Attrs: noinline
-// define i1 @EvalRuntimeFilters(%"class.impala::HdfsParquetScanner"* %this,
-//                               %"class.impala::TupleRow"* %row) #34 {
-// entry:
-//   %0 = call i1 @_ZN6impala18HdfsParquetScanner17EvalRuntimeFilterEiPNS_8TupleRowE.2(
-//       %"class.impala::HdfsParquetScanner"* %this, i32 0, %"class.impala::TupleRow"*
-//       %row)
-//   br i1 %0, label %continue, label %bail_out
-//
-// bail_out:                                         ; preds = %entry
-//   ret i1 false
-//
-// continue:                                         ; preds = %entry
-//   ret i1 true
-// }
-//
-// EvalRuntimeFilter() is the same as the cross-compiled version except EvalOneFilter()
-// is replaced with the one generated by CodegenEvalOneFilter().
-Status HdfsParquetScanner::CodegenEvalRuntimeFilters(
-    LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) {
-  llvm::LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-
-  *fn = nullptr;
-  llvm::Type* this_type = codegen->GetStructPtrType<HdfsParquetScanner>();
-  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
-  LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
-      codegen->bool_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  llvm::Value* args[2];
-  llvm::Function* eval_runtime_filters_fn = prototype.GeneratePrototype(&builder, args);
-  llvm::Value* this_arg = args[0];
-  llvm::Value* row_arg = args[1];
-
-  int num_filters = filter_exprs.size();
-  if (num_filters == 0) {
-    builder.CreateRet(codegen->true_value());
-  } else {
-    // row_rejected_block: jump target for when a filter is evaluated to false.
-    llvm::BasicBlock* row_rejected_block =
-        llvm::BasicBlock::Create(context, "row_rejected", eval_runtime_filters_fn);
-
-    DCHECK_GT(num_filters, 0);
-    for (int i = 0; i < num_filters; ++i) {
-      llvm::Function* eval_runtime_filter_fn =
-          codegen->GetFunction(IRFunction::PARQUET_SCANNER_EVAL_RUNTIME_FILTER, true);
-      DCHECK(eval_runtime_filter_fn != nullptr);
-
-      // Codegen function for inlining filter's expression evaluation and constant fold
-      // the type of the expression into the hashing function to avoid branches.
-      llvm::Function* eval_one_filter_fn;
-      DCHECK(filter_exprs[i] != nullptr);
-      RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
-          &eval_one_filter_fn));
-      DCHECK(eval_one_filter_fn != nullptr);
-
-      int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, eval_one_filter_fn,
-          "FilterContext4Eval");
-      DCHECK_EQ(replaced, 1);
-
-      llvm::Value* idx = codegen->GetI32Constant(i);
-      llvm::Value* passed_filter = builder.CreateCall(
-          eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, row_arg}));
-
-      llvm::BasicBlock* continue_block =
-          llvm::BasicBlock::Create(context, "continue", eval_runtime_filters_fn);
-      builder.CreateCondBr(passed_filter, continue_block, row_rejected_block);
-      builder.SetInsertPoint(continue_block);
-    }
-    builder.CreateRet(codegen->true_value());
-
-    builder.SetInsertPoint(row_rejected_block);
-    builder.CreateRet(codegen->false_value());
-
-    // Don't inline this function to avoid code bloat in ProcessScratchBatch().
-    // If there is any filter, EvalRuntimeFilters() is large enough to not benefit
-    // much from inlining.
-    eval_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
-  }
-
-  *fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
-  if (*fn == nullptr) {
-    return Status("Codegen'd HdfsParquetScanner::EvalRuntimeFilters() failed "
-        "verification, see log");
-  }
-  return Status::OK();
-}
-
 bool HdfsParquetScanner::AssembleCollection(
     const vector<ParquetColumnReader*>& column_readers, int new_collection_rep_level,
     CollectionValueBuilder* coll_value_builder) {

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index ccb109c..7fede3b 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -20,7 +20,6 @@
 #define IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
 
 #include "codegen/impala-ir.h"
-#include "common/global-flags.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/parquet-common.h"
 #include "exec/parquet-scratch-tuple-batch.h"
@@ -362,14 +361,6 @@ class HdfsParquetScanner : public HdfsScanner {
   friend class ScalarColumnReader;
   friend class BoolColumnReader;
 
-  /// Size of the file footer.  This is a guess.  If this value is too little, we will
-  /// need to issue another read.
-  static const int64_t FOOTER_SIZE = 1024 * 100;
-  static_assert(FOOTER_SIZE <= READ_SIZE_MIN_VALUE,
-      "FOOTER_SIZE can not be greater than READ_SIZE_MIN_VALUE.\n"
-      "You can increase FOOTER_SIZE if you want, "
-      "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
-
   /// Index of the current row group being processed. Initialized to -1 which indicates
   /// that we have not started processing the first row group yet (GetNext() has not yet
   /// been called).
@@ -391,41 +382,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// the scanner. Stored in 'obj_pool_'.
   vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
-  /// Cached runtime filter contexts, one for each filter that applies to this column,
-  /// owned by instances of this class.
-  vector<const FilterContext*> filter_ctxs_;
-
-  struct LocalFilterStats {
-    /// Total number of rows to which each filter was applied
-    int64_t considered;
-
-    /// Total number of rows that each filter rejected.
-    int64_t rejected;
-
-    /// Total number of rows that each filter could have been applied to (if it were
-    /// available from row 0).
-    int64_t total_possible;
-
-    /// Use known-width type to act as logical boolean.  Set to 1 if corresponding filter
-    /// in filter_ctxs_ should be applied, 0 if it was ineffective and was disabled.
-    uint8_t enabled;
-
-    /// Padding to ensure structs do not straddle cache-line boundary.
-    uint8_t padding[7];
-
-    LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { }
-  };
-
   /// Pool used for allocating caches of definition/repetition levels and tuples for
   /// dictionary filtering. The definition/repetition levels are populated by the
   /// level readers. The pool is freed in Close().
   boost::scoped_ptr<MemPool> perm_pool_;
 
-  /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner so
-  /// that expensive aggregation up to the scan node can be performed once, during
-  /// Close().
-  vector<LocalFilterStats> filter_stats_;
-
   /// Number of scratch batches processed so far.
   int64_t row_batches_produced_;
 
@@ -511,10 +472,6 @@ class HdfsParquetScanner : public HdfsScanner {
   Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
       const parquet::RowGroup& row_group, bool* skip_row_group) WARN_UNUSED_RESULT;
 
-  /// Check runtime filters' effectiveness every BATCHES_PER_FILTER_SELECTIVITY_CHECK
-  /// row batches. Will update 'filter_stats_'.
-  void CheckFiltersEffectiveness();
-
   /// Advances 'row_group_idx_' to the next non-empty row group and initializes
   /// the column readers to scan it. Recoverable errors are logged to the runtime
   /// state. Only returns a non-OK status if a non-recoverable error is encountered
@@ -548,24 +505,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// materialized tuples. This is a separate function so it can be codegened.
   int ProcessScratchBatch(RowBatch* dst_batch);
 
-  /// Evaluates 'row' against the i-th runtime filter for this scan node and returns
-  /// true if 'row' finds a match in the filter. Returns false otherwise.
-  bool EvalRuntimeFilter(int i, TupleRow* row);
-
-  /// Evaluates runtime filters (if any) against the given row. Returns true if
-  /// they passed, false otherwise. Maintains the runtime filter stats, determines
-  /// whether the filters are effective, and disables them if they are not. This is
-  /// replaced by generated code at runtime.
-  bool EvalRuntimeFilters(TupleRow* row);
-
-  /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted version
-  /// and emitting a customized version of EvalRuntimeFilter() for each filter in
-  /// 'filter_ctxs'. Return error status on failure. The generated function is returned
-  /// via 'fn'.
-  static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
-      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn)
-      WARN_UNUSED_RESULT;
-
   /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue
   /// allocated from 'coll_value_builder'. Increases 'coll_items_read_counter_' by the
   /// number of items in this collection and descendant collections.
@@ -592,10 +531,6 @@ class HdfsParquetScanner : public HdfsScanner {
   inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& column_readers,
       bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
 
-  /// Find and return the last split in the file if it is assigned to this scan node.
-  /// Returns NULL otherwise.
-  static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
-
   /// Process the file footer and parse file_metadata_.  This should be called with the
   /// last FOOTER_SIZE bytes in context_.
   Status ProcessFooter() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 80cb6c5..81642bf 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -23,6 +23,7 @@
 #include "exec/hdfs-rcfile-scanner.h"
 #include "exec/hdfs-avro-scanner.h"
 #include "exec/hdfs-parquet-scanner.h"
+#include "exec/hdfs-orc-scanner.h"
 
 #include <avro/errors.h>
 #include <avro/schema.h>
@@ -457,6 +458,8 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
   // Issue initial ranges for all file types.
   RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
       matching_per_type_files[THdfsFileFormat::PARQUET]));
+  RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::ORC]));
   RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
       matching_per_type_files[THdfsFileFormat::TEXT]));
   RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
@@ -588,6 +591,9 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
     case THdfsFileFormat::PARQUET:
       scanner->reset(new HdfsParquetScanner(this, runtime_state_));
       break;
+    case THdfsFileFormat::ORC:
+      scanner->reset(new HdfsOrcScanner(this, runtime_state_));
+      break;
     default:
       return Status(Substitute("Unknown Hdfs file format type: $0",
           partition->file_format()));

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 7ea4d80..d143e91 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -46,6 +46,7 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
   // because the scanner of the corresponding file format does implement GetNext().
   for (const auto& files: per_type_files_) {
     if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET
+        && files.first != THdfsFileFormat::ORC
         && files.first != THdfsFileFormat::TEXT) {
       stringstream msg;
       msg << "Unsupported file format with HdfsScanNodeMt: " << files.first;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index ec1d2a3..0d34356 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -95,6 +95,20 @@ void StringToDecimalSymbolDummy() {
   StringToDecimal16(nullptr, 0, 0, 0, false, nullptr);
 }
 
+bool HdfsScanner::EvalRuntimeFilter(int i, TupleRow* row) {
+  LocalFilterStats* stats = &filter_stats_[i];
+  const FilterContext* ctx = filter_ctxs_[i];
+  ++stats->total_possible;
+  if (stats->enabled && ctx->filter->HasFilter()) {
+    ++stats->considered;
+    if (!ctx->Eval(row)) {
+      ++stats->rejected;
+      return false;
+    }
+  }
+  return true;
+}
+
 // Define the string parsing functions for llvm.  Stamp out the templated functions
 #ifdef IR_COMPILE
 using ParseResult = StringParser::ParseResult;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 1809fe5..a4aee4d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -35,10 +35,15 @@
 #include "common/names.h"
 
 using namespace impala;
+using namespace impala::io;
 using namespace strings;
 
+DEFINE_double(min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
+    "rows rejected by a runtime filter drops below this value, the filter is disabled.");
+
 const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
 const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
+const int64_t HdfsScanner::FOOTER_SIZE;
 
 HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
@@ -587,6 +592,96 @@ Status HdfsScanner::CodegenInitTuple(
   return Status::OK();
 }
 
+// ; Function Attrs: noinline
+// define i1 @EvalRuntimeFilters(%"class.impala::HdfsScanner"* %this,
+//                               %"class.impala::TupleRow"* %row) #34 {
+// entry:
+//   %0 = call i1 @_ZN6impala11HdfsScanner17EvalRuntimeFilterEiPNS_8TupleRowE.2(
+//       %"class.impala::HdfsScanner"* %this, i32 0, %"class.impala::TupleRow"*
+//       %row)
+//   br i1 %0, label %continue, label %bail_out
+//
+// bail_out:                                         ; preds = %entry
+//   ret i1 false
+//
+// continue:                                         ; preds = %entry
+//   ret i1 true
+// }
+//
+// EvalRuntimeFilter() is the same as the cross-compiled version except EvalOneFilter()
+// is replaced with the one generated by CodegenEvalOneFilter().
+Status HdfsScanner::CodegenEvalRuntimeFilters(
+    LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn) {
+  llvm::LLVMContext& context = codegen->context();
+  LlvmBuilder builder(context);
+
+  *fn = nullptr;
+  llvm::Type* this_type = codegen->GetStructPtrType<HdfsScanner>();
+  llvm::PointerType* tuple_row_ptr_type = codegen->GetStructPtrType<TupleRow>();
+  LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
+                                     codegen->bool_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+  llvm::Value* args[2];
+  llvm::Function* eval_runtime_filters_fn = prototype.GeneratePrototype(&builder, args);
+  llvm::Value* this_arg = args[0];
+  llvm::Value* row_arg = args[1];
+
+  int num_filters = filter_exprs.size();
+  if (num_filters == 0) {
+    builder.CreateRet(codegen->true_value());
+  } else {
+    // row_rejected_block: jump target for when a filter is evaluated to false.
+    llvm::BasicBlock* row_rejected_block =
+        llvm::BasicBlock::Create(context, "row_rejected", eval_runtime_filters_fn);
+
+    DCHECK_GT(num_filters, 0);
+    for (int i = 0; i < num_filters; ++i) {
+      llvm::Function* eval_runtime_filter_fn =
+          codegen->GetFunction(IRFunction::HDFS_SCANNER_EVAL_RUNTIME_FILTER, true);
+      DCHECK(eval_runtime_filter_fn != nullptr);
+
+      // Codegen function for inlining filter's expression evaluation and constant fold
+      // the type of the expression into the hashing function to avoid branches.
+      llvm::Function* eval_one_filter_fn;
+      DCHECK(filter_exprs[i] != nullptr);
+      RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
+          &eval_one_filter_fn));
+      DCHECK(eval_one_filter_fn != nullptr);
+
+      int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, eval_one_filter_fn,
+          "FilterContext4Eval");
+      DCHECK_EQ(replaced, 1);
+
+      llvm::Value* idx = codegen->GetI32Constant(i);
+      llvm::Value* passed_filter = builder.CreateCall(
+          eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, row_arg}));
+
+      llvm::BasicBlock* continue_block =
+          llvm::BasicBlock::Create(context, "continue", eval_runtime_filters_fn);
+      builder.CreateCondBr(passed_filter, continue_block, row_rejected_block);
+      builder.SetInsertPoint(continue_block);
+    }
+    builder.CreateRet(codegen->true_value());
+
+    builder.SetInsertPoint(row_rejected_block);
+    builder.CreateRet(codegen->false_value());
+
+    // Don't inline this function to avoid code bloat in ProcessScratchBatch().
+    // If there is any filter, EvalRuntimeFilters() is large enough to not benefit
+    // much from inlining.
+    eval_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
+  }
+
+  *fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
+  if (*fn == nullptr) {
+    return Status("Codegen'd HdfsScanner::EvalRuntimeFilters() failed "
+        "verification, see log");
+  }
+  return Status::OK();
+}
+
 Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& compression) {
   // Check whether the file in the stream has different compression from the last one.
   if (compression != decompression_type_) {
@@ -671,3 +766,87 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
     if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str());
   }
 }
+
+void HdfsScanner::CheckFiltersEffectiveness() {
+  for (int i = 0; i < filter_stats_.size(); ++i) {
+    LocalFilterStats* stats = &filter_stats_[i];
+    const RuntimeFilter* filter = filter_ctxs_[i]->filter;
+    double reject_ratio = stats->rejected / static_cast<double>(stats->considered);
+    if (filter->AlwaysTrue() ||
+        reject_ratio < FLAGS_min_filter_reject_ratio) {
+      stats->enabled = 0;
+    }
+  }
+}
+
+Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
+    const THdfsFileFormat::type& file_type, const vector<HdfsFileDesc*>& files) {
+  vector<ScanRange*> footer_ranges;
+  for (int i = 0; i < files.size(); ++i) {
+    // Compute the offset of the file footer.
+    int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
+    int64_t footer_start = files[i]->file_length - footer_size;
+    DCHECK_GE(footer_start, 0);
+
+    // Try to find the split with the footer.
+    ScanRange* footer_split = FindFooterSplit(files[i]);
+
+    for (int j = 0; j < files[i]->splits.size(); ++j) {
+      ScanRange* split = files[i]->splits[j];
+
+      DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
+      // If there are no materialized slots (such as count(*) over the table), we can
+      // get the result with the file metadata alone and don't need to read any row
+      // groups. We only want a single node to process the file footer in this case,
+      // which is the node with the footer split.  If it's not a count(*), we create a
+      // footer range for the split always.
+      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
+        ScanRangeMetadata* split_metadata =
+            static_cast<ScanRangeMetadata*>(split->meta_data());
+        // Each split is processed by first issuing a scan range for the file footer, which
+        // is done here, followed by scan ranges for the columns of each row group within
+        // the actual split (in InitColumns()). The original split is stored in the
+        // metadata associated with the footer range.
+        ScanRange* footer_range;
+        if (footer_split != nullptr) {
+          footer_range = scan_node->AllocateScanRange(files[i]->fs,
+              files[i]->filename.c_str(), footer_size, footer_start,
+              split_metadata->partition_id, footer_split->disk_id(),
+              footer_split->expected_local(),
+              BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
+        } else {
+          // If we did not find the last split, we know it is going to be a remote read.
+          footer_range =
+              scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
+                   footer_size, footer_start, split_metadata->partition_id, -1, false,
+                   BufferOpts::Uncached(), split);
+        }
+
+        footer_ranges.push_back(footer_range);
+      } else {
+        scan_node->RangeComplete(file_type, THdfsCompression::NONE);
+      }
+    }
+  }
+  // The threads that process the footer will also do the scan, so we mark all the files
+  // as complete here.
+  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  return Status::OK();
+}
+
+ScanRange* HdfsScanner::FindFooterSplit(HdfsFileDesc* file) {
+  DCHECK(file != nullptr);
+  for (int i = 0; i < file->splits.size(); ++i) {
+    ScanRange* split = file->splits[i];
+    if (split->offset() + split->len() == file->file_length) return split;
+  }
+  return nullptr;
+}
+
+bool HdfsScanner::EvalRuntimeFilters(TupleRow* row) {
+  int num_filters = filter_ctxs_.size();
+  for (int i = 0; i < num_filters; ++i) {
+    if (!EvalRuntimeFilter(i, row)) return false;
+  }
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6497457..73b8b70 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -25,6 +25,7 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
+#include "common/global-flags.h"
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exec/hdfs-scan-node-base.h"
@@ -44,6 +45,12 @@ class TextConverter;
 class TupleDescriptor;
 class SlotDescriptor;
 
+// The number of row batches between checks to see if a filter is effective, and
+// should be disabled. Must be a power of two.
+constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
+static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
+              "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
+
 /// Intermediate structure used for two pass parsing approach. In the first pass,
 /// the FieldLocation structs are filled out and contain where all the fields start and
 /// their lengths.  In the second pass, the FieldLocation is used to write out the
@@ -287,6 +294,67 @@ class HdfsScanner {
   /// Jitted write tuples function pointer.  Null if codegen is disabled.
   WriteTuplesFn write_tuples_fn_ = nullptr;
 
+  struct LocalFilterStats {
+    /// Total number of rows to which each filter was applied
+    int64_t considered;
+
+    /// Total number of rows that each filter rejected.
+    int64_t rejected;
+
+    /// Total number of rows that each filter could have been applied to (if it were
+    /// available from row 0).
+    int64_t total_possible;
+
+    /// Use known-width type to act as logical boolean.  Set to 1 if corresponding filter
+    /// in filter_ctxs_ should be applied, 0 if it was ineffective and was disabled.
+    uint8_t enabled;
+
+    /// Padding to ensure structs do not straddle cache-line boundary.
+    uint8_t padding[7];
+
+    LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { }
+  };
+
+  /// Cached runtime filter contexts, one for each filter that applies to this column.
+  vector<const FilterContext *> filter_ctxs_;
+
+  /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner
+  /// so that expensive aggregation up to the scan node can be performed once, during
+  /// Close().
+  vector<LocalFilterStats> filter_stats_;
+
+  /// Size of the file footer for ORC and Parquet. This is a guess. If this value is too
+  /// little, we will need to issue another read.
+  static const int64_t FOOTER_SIZE = 1024 * 100;
+  static_assert(FOOTER_SIZE <= READ_SIZE_MIN_VALUE,
+      "FOOTER_SIZE can not be greater than READ_SIZE_MIN_VALUE.\n"
+      "You can increase FOOTER_SIZE if you want, "
+      "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
+
+  /// Check runtime filters' effectiveness every BATCHES_PER_FILTER_SELECTIVITY_CHECK
+  /// row batches. Will update 'filter_stats_'.
+  void CheckFiltersEffectiveness();
+
+  /// Evaluates 'row' against the i-th runtime filter for this scan node and returns
+  /// true if 'row' finds a match in the filter. Returns false otherwise.
+  bool EvalRuntimeFilter(int i, TupleRow* row);
+
+  /// Evaluates runtime filters (if any) against the given row. Returns true if
+  /// they passed, false otherwise. Maintains the runtime filter stats, determines
+  /// whether the filters are effective, and disables them if they are not. This is
+  /// replaced by generated code at runtime.
+  bool EvalRuntimeFilters(TupleRow* row);
+
+  /// Find and return the last split in the file if it is assigned to this scan node.
+  /// Returns NULL otherwise.
+  static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
+
+  /// Issue just the footer range for each file. This function is only used in parquet
+  /// and orc scanners. We'll then parse the footer and pick out the columns we want.
+  static Status IssueFooterRanges(HdfsScanNodeBase* scan_node,
+      const THdfsFileFormat::type& file_type, const std::vector<HdfsFileDesc*>& files)
+      WARN_UNUSED_RESULT;
+
   /// Implements GetNext(). Should be overridden by subclasses.
   /// Only valid to call if the parent scan node is multi-threaded.
   virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT {
@@ -420,6 +488,14 @@ class HdfsScanner {
   static Status CodegenInitTuple(
       const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** init_tuple_fn);
 
+  /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted version
+  /// and emitting a customized version of EvalRuntimeFilter() for each filter in
+  /// 'filter_ctxs'. Return error status on failure. The generated function is returned
+  /// via 'fn'.
+  static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
+      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn)
+      WARN_UNUSED_RESULT;
+
   /// Report parse error for column @ desc.   If abort_on_error is true, sets
   /// parse_status_ to the error message.
   void ReportColumnParseError(const SlotDescriptor* desc, const char* data, int len);

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 0bbaa89..02e1ed8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -27,6 +27,7 @@
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
+DECLARE_bool(enable_orc_scanner);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(max_hdfs_partitions_parallel_load);
@@ -56,6 +57,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   TBackendGflags cfg;
   cfg.__set_authorization_policy_file(FLAGS_authorization_policy_file);
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
+  cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index a09c905..f54bf04 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -429,7 +429,7 @@ if __name__ == "__main__":
   packages = map(Package, ["llvm", "kudu",
       "avro", "binutils", "boost", "breakpad", "bzip2", "cmake", "crcutil",
       "flatbuffers", "gcc", "gflags", "glog", "gperftools", "gtest", "libev",
-      "lz4", "openldap", "openssl", "protobuf",
+      "lz4", "openldap", "openssl", "orc", "protobuf",
       "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"])
   packages.insert(0, Package("llvm", "5.0.1-asserts"))
   bootstrap(toolchain_root, packages)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b2d9d15..f113e53 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -127,6 +127,8 @@ export IMPALA_OPENLDAP_VERSION=2.4.25
 unset IMPALA_OPENLDAP_URL
 export IMPALA_OPENSSL_VERSION=1.0.2l
 unset IMPALA_OPENSSL_URL
+export IMPALA_ORC_VERSION=1.4.3-p2
+unset IMPALA_ORC_URL
 export IMPALA_PROTOBUF_VERSION=2.6.1
 unset IMPALA_PROTOBUF_URL
 export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/cmake_modules/FindOrc.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindOrc.cmake b/cmake_modules/FindOrc.cmake
new file mode 100644
index 0000000..ef06396
--- /dev/null
+++ b/cmake_modules/FindOrc.cmake
@@ -0,0 +1,55 @@
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+# - Find Orc (headers and liborc.a) with ORC_ROOT hinting a location
+# This module defines
+#  ORC_INCLUDE_DIR, directory containing headers
+#  ORC_STATIC_LIB, path to liborc.a
+#  ORC_FOUND
+set(ORC_SEARCH_HEADER_PATHS
+  ${ORC_ROOT}/include
+  $ENV{IMPALA_HOME}/thirdparty/orc-$ENV{IMPALA_ORC_VERSION}/build/include)
+
+set(ORC_SEARCH_LIB_PATH
+  ${ORC_ROOT}/lib
+  $ENV{IMPALA_HOME}/thirdparty/orc-$ENV{IMPALA_ORC_VERSION}/build/lib)
+
+find_path(ORC_INCLUDE_DIR NAMES orc/OrcFile.hh OrcFile.hh PATHS
+  ${ORC_SEARCH_HEADER_PATHS}
+  # make sure we don't accidentally pick up a different version
+  NO_DEFAULT_PATH)
+
+find_library(ORC_STATIC_LIB NAMES liborc.a PATHS ${ORC_SEARCH_LIB_PATH})
+
+if(NOT ORC_STATIC_LIB)
+  message(FATAL_ERROR "ORC includes and libraries NOT found. "
+    "Looked for headers in ${ORC_SEARCH_HEADER_PATHS}, "
+    "and for libs in ${ORC_SEARCH_LIB_PATH}")
+  set(ORC_FOUND FALSE)
+else()
+  set(ORC_FOUND TRUE)
+endif ()
+
+set(ORC_FOUND ${ORC_STATIC_LIB_FOUND})
+
+mark_as_advanced(
+  ORC_INCLUDE_DIR
+  ORC_STATIC_LIB
+  ORC_FOUND
+)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 412ca06..c98f50a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -73,4 +73,6 @@ struct TBackendGflags {
   23: required double max_filter_error_rate
 
   24: required i64 min_buffer_size
+
+  25: required bool enable_orc_scanner
 }


[03/11] impala git commit: IMPALA-6842: [DOCS] Remove disable_admission_control startup flag

Posted by bh...@apache.org.
IMPALA-6842: [DOCS] Remove disable_admission_control startup flag

Change-Id: Idbd15823308dbce5d2d00e79607e5ebbdab3e38f
Reviewed-on: http://gerrit.cloudera.org:8080/10046
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1c896eff
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1c896eff
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1c896eff

Branch: refs/heads/2.x
Commit: 1c896effc6d9530e7752b07b8aa62f4aa5dfaf39
Parents: e94dfe4
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Apr 12 11:26:05 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_admission.xml | 16 ----------------
 1 file changed, 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1c896eff/docs/topics/impala_admission.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_admission.xml b/docs/topics/impala_admission.xml
index f1359cf..5de246b 100644
--- a/docs/topics/impala_admission.xml
+++ b/docs/topics/impala_admission.xml
@@ -484,22 +484,6 @@ under the License.
                 <codeph>""</codeph> (empty string, meaning unlimited) </p>
             </dd>
           </dlentry>
-          <dlentry id="disable_admission_control">
-            <dt>
-              <codeph>disable_admission_control</codeph>
-            </dt>
-            <dd>
-              <indexterm audience="hidden">--disable_admission_control</indexterm>
-              <b>Purpose:</b> Turns off the admission control feature entirely,
-              regardless of other configuration option settings.
-              <p>
-                <b>Type:</b> Boolean </p>
-              <p>
-                <b>Default:</b>
-                <codeph>false</codeph>
-              </p>
-            </dd>
-          </dlentry>
           <dlentry id="disable_pool_max_requests">
             <dt>
               <codeph>disable_pool_max_requests</codeph>


[09/11] impala git commit: IMPALA-2717: fix output of formatted unicode to non-TTY

Posted by bh...@apache.org.
IMPALA-2717: fix output of formatted unicode to non-TTY

The bug is that PrettyOutputFormatter.format() returned a unicode
object, and Python cannot automatically write unicode objects to
output streams where there is no default encoding.

The fix is to convert to UTF-8 encoded in a regular string, which
can be output to any output device. This makes the output type
consistent with DelimitedOutputFormatter.format().

Based on code by Marcell Szabo.

Testing:
Added a basic test.

Played around in an interactive shell to make sure that unicode
characters still work in interactive mode.

Change-Id: I9de641ecf767a2feef3b9f48b344ef2d55e17a7f
Reviewed-on: http://gerrit.cloudera.org:8080/9928
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/371107ab
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/371107ab
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/371107ab

Branch: refs/heads/2.x
Commit: 371107abd66fd9f519c49559a850d10245958a2d
Parents: 1c896ef
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 4 11:51:51 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 |  4 +++-
 shell/shell_output.py                 |  8 +++++++-
 tests/shell/test_shell_commandline.py | 25 +++++++++++++++++++++----
 3 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/371107ab/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 67391bf..a2dd89f 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -70,7 +70,9 @@ class CmdStatus:
   ERROR = False
 
 class ImpalaPrettyTable(prettytable.PrettyTable):
-  """Patched version of PrettyTable that TODO"""
+  """Patched version of PrettyTable with different unicode handling - instead of throwing
+  exceptions when a character can't be converted to unicode, it is replaced with a
+  placeholder character."""
   def _unicode(self, value):
     if not isinstance(value, basestring):
       value = str(value)

http://git-wip-us.apache.org/repos/asf/impala/blob/371107ab/shell/shell_output.py
----------------------------------------------------------------------
diff --git a/shell/shell_output.py b/shell/shell_output.py
index f0cecc8..8ab3bee 100644
--- a/shell/shell_output.py
+++ b/shell/shell_output.py
@@ -28,11 +28,16 @@ class PrettyOutputFormatter(object):
     self.prettytable = prettytable
 
   def format(self, rows):
+    """Returns string containing UTF-8-encoded representation of the table data."""
     # Clear rows that already exist in the table.
     self.prettytable.clear_rows()
     try:
       map(self.prettytable.add_row, rows)
-      return self.prettytable.get_string()
+      # PrettyTable.get_string() converts UTF-8-encoded strs added via add_row() into
+      # Python unicode strings. We need to convert it back to a UTF-8-encoded str for
+      # output, since Python won't do the encoding automatically when outputting to a
+      # non-terminal (see IMPALA-2717).
+      return self.prettytable.get_string().encode('utf-8')
     except Exception, e:
       # beeswax returns each row as a tab separated string. If a string column
       # value in a row has tabs, it will break the row split. Default to displaying
@@ -53,6 +58,7 @@ class DelimitedOutputFormatter(object):
         raise ValueError, error_msg
 
   def format(self, rows):
+    """Returns string containing UTF-8-encoded representation of the table data."""
     # csv.writer expects a file handle to the input.
     # cStringIO is used as the temporary buffer.
     temp_buffer = StringIO()

http://git-wip-us.apache.org/repos/asf/impala/blob/371107ab/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index f5f67c0..10513b6 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -33,6 +33,8 @@ from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
 
+RUSSIAN_CHARS = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
+                 u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
 
 @pytest.fixture
 def empty_table(unique_database, request):
@@ -406,12 +408,27 @@ class TestImpalaShell(ImpalaTestSuite):
 
   def test_international_characters(self):
     """Sanity test to ensure that the shell can read international characters."""
-    russian_chars = (u"А, Б, В, Г, Д, Е, Ё, Ж, З, И, Й, К, Л, М, Н, О, П, Р,"
-                     u"С, Т, У, Ф, Х, Ц,Ч, Ш, Щ, Ъ, Ы, Ь, Э, Ю, Я")
-    args = """-B -q "select '%s'" """ % russian_chars
+    args = """-B -q "select '%s'" """ % RUSSIAN_CHARS
     result = run_impala_shell_cmd(args.encode('utf-8'))
     assert 'UnicodeDecodeError' not in result.stderr
-    assert russian_chars.encode('utf-8') in result.stdout
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+
+  def test_international_characters_prettyprint(self):
+    """IMPALA-2717: ensure we can handle international characters in pretty-printed
+    output"""
+    args = """-q "select '%s'" """ % RUSSIAN_CHARS
+    result = run_impala_shell_cmd(args.encode('utf-8'))
+    assert 'UnicodeDecodeError' not in result.stderr
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
+
+  def test_international_characters_prettyprint_tabs(self):
+    """IMPALA-2717: ensure we can handle international characters in pretty-printed
+    output when pretty-printing falls back to delimited output."""
+    args = """-q "select '%s\\t'" """ % RUSSIAN_CHARS
+    result = run_impala_shell_cmd(args.encode('utf-8'))
+    assert 'Reverting to tab delimited text' in result.stderr
+    assert 'UnicodeDecodeError' not in result.stderr
+    assert RUSSIAN_CHARS.encode('utf-8') in result.stdout
 
   @pytest.mark.execute_serially  # This tests invalidates metadata, and must run serially
   def test_config_file(self):


[05/11] impala git commit: IMPALA-4631: loosen monotonic clock DCHECK

Posted by bh...@apache.org.
IMPALA-4631: loosen monotonic clock DCHECK

We saw another build failure due to hitting one of these DCHECKs.
Let's further loosen the check to avoid the failures on misbehaving
systems.

Change-Id: I72d314518087aede16e8d702c2f904b679a55f6d
Reviewed-on: http://gerrit.cloudera.org:8080/10026
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/22750d45
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/22750d45
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/22750d45

Branch: refs/heads/2.x
Commit: 22750d4561193cf28d8de28ce7c20fceca61f205
Parents: 08a01b9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Apr 11 15:43:51 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/fragment-instance-state.cc | 7 ++++---
 be/src/util/runtime-profile-counters.h    | 6 +++---
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/22750d45/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index acbce84..7322519 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -320,9 +320,10 @@ void FragmentInstanceState::Close() {
       RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name);
       if (counter != nullptr) other_time += counter->value();
     }
-    // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for some reason
-    // we don't yet understand, so add 1 to total_time to avoid DCHECKing in that case.
-    DCHECK_LE(other_time, total_time + 1);
+    // TODO: IMPALA-4631: Occasionally we see other_time = total_time + ε where ε is 1,
+    // 2, or 3. It appears to be a bug with clocks on some virtualized systems. Add 3
+    // to total_time to avoid DCHECKing in that case.
+    DCHECK_LE(other_time, total_time + 3);
   }
 #endif
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/22750d45/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 52410b5..de29318 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -303,9 +303,9 @@ class RuntimeProfile::EventSequence {
   void Start(int64_t start_time_ns) {
     offset_ = MonotonicStopWatch::Now() - start_time_ns;
     // TODO: IMPALA-4631: Occasionally we see MonotonicStopWatch::Now() return
-    // (start_time_ns - 1), even though 'start_time_ns' was obtained using
-    // MonotonicStopWatch::Now().
-    DCHECK_GE(offset_, -1);
+    // (start_time_ns - e), where e is 1, 2 or 3 even though 'start_time_ns' was
+    // obtained using MonotonicStopWatch::Now().
+    DCHECK_GE(offset_, -3);
     sw_.Start();
   }
 


[04/11] impala git commit: IMPALA-6623: [DOCS] ltrim and rtrim docs updated

Posted by bh...@apache.org.
IMPALA-6623: [DOCS] ltrim and rtrim docs updated

Change-Id: If4f7a04e3c64eade7a23cded21de5ff91c9c8c8c
Reviewed-on: http://gerrit.cloudera.org:8080/9984
Reviewed-by: Zoram Thanga <zo...@cloudera.com>
Reviewed-by: Alex Rodoni <ar...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4510512c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4510512c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4510512c

Branch: refs/heads/2.x
Commit: 4510512c8c172cddde99939123740085cb351e12
Parents: 4de7dbd
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue Apr 10 17:00:13 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_string_functions.xml | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4510512c/docs/topics/impala_string_functions.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_string_functions.xml b/docs/topics/impala_string_functions.xml
index fe08c8e..9ee2665 100644
--- a/docs/topics/impala_string_functions.xml
+++ b/docs/topics/impala_string_functions.xml
@@ -650,12 +650,14 @@ select instr('foo bar bletch', 'b', 1, null);
       <dlentry id="ltrim">
 
         <dt>
-          <codeph>ltrim(string a)</codeph>
+          <codeph>ltrim(string a [, string chars_to_trim])</codeph>
         </dt>
 
         <dd>
           <indexterm audience="hidden">ltrim() function</indexterm>
-          <b>Purpose:</b> Returns the argument string with any leading spaces removed from the left side.
+          <b>Purpose:</b> Returns the argument string with all occurrences
+          of characters specified by the second argument removed from
+          the left side. Removes spaces if the second argument is not specified.
           <p>
             <b>Return type:</b> <codeph>string</codeph>
           </p>
@@ -1039,12 +1041,14 @@ select replace('hello world','xyz','abc');
       <dlentry id="rtrim">
 
         <dt>
-          <codeph>rtrim(string a)</codeph>
+          <codeph>rtrim(string a [, string chars_to_trim])</codeph>
         </dt>
 
         <dd>
           <indexterm audience="hidden">rtrim() function</indexterm>
-          <b>Purpose:</b> Returns the argument string with any trailing spaces removed from the right side.
+          <b>Purpose:</b> Returns the argument string with all occurrences
+          of characters specified by the second argument removed from
+          the right side. Removes spaces if the second argument is not specified.
           <p>
             <b>Return type:</b> <codeph>string</codeph>
           </p>


[08/11] impala git commit: IMPALA-6710: [DOCS] Update the Partition Insert content

Posted by bh...@apache.org.
IMPALA-6710: [DOCS] Update the Partition Insert content

Added a section at the end for inserting into partitioned tables.

Change-Id: I4ccc8227579dabc321a949da95e8a59158528f20
Reviewed-on: http://gerrit.cloudera.org:8080/9977
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4de7dbda
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4de7dbda
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4de7dbda

Branch: refs/heads/2.x
Commit: 4de7dbda7a809469be97fcb9c3fe390ce0b22f72
Parents: c609fa8
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue Apr 10 14:10:51 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_insert.xml | 257 +++++++++++++++++++++++++------------
 1 file changed, 173 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4de7dbda/docs/topics/impala_insert.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_insert.xml b/docs/topics/impala_insert.xml
index 3880a70..bfd8c01 100644
--- a/docs/topics/impala_insert.xml
+++ b/docs/topics/impala_insert.xml
@@ -180,29 +180,6 @@ hint_with_brackets ::= [SHUFFLE] | [NOSHUFFLE]
         </p>
       </li>
 
-      <li>
-        <p>
-          For a partitioned table, the optional <codeph>PARTITION</codeph> clause identifies which partition or
-          partitions the new values go into. If a partition key column is given a constant value such as
-          <codeph>PARTITION (year=2012)</codeph> or <codeph>PARTITION (year=2012, month=2)</codeph>, all the
-          inserted rows use those same values for those partition key columns and you omit any corresponding
-          columns in the source table from the <codeph>SELECT</codeph> list. This form is known as <q>static
-          partitioning</q>.
-        </p>
-        <p>
-          If a partition key column is mentioned but not assigned a value, such as in <codeph>PARTITION (year,
-          region)</codeph> (both columns unassigned) or <codeph>PARTITION(year, region='CA')</codeph>
-          (<codeph>year</codeph> column unassigned), the unassigned columns are filled in with the final columns of
-          the <codeph>SELECT</codeph> list. In this case, the number of columns in the <codeph>SELECT</codeph> list
-          must equal the number of columns in the column permutation plus the number of partition key columns not
-          assigned a constant value. This form is known as <q>dynamic partitioning</q>.
-        </p>
-        <p>
-          See <xref href="impala_partitioning.xml#partition_static_dynamic"/> for examples and performance
-          characteristics of static and dynamic partitioned inserts.
-        </p>
-      </li>
-
       <li rev="1.2.2">
         An optional hint clause immediately either before the <codeph>SELECT</codeph> keyword or after the
         <codeph>INSERT</codeph> keyword, to fine-tune the behavior when doing an <codeph>INSERT ... SELECT</codeph>
@@ -385,28 +362,6 @@ Backend 0:RC_FILE not implemented.
 Remote error
 Backend 0:SEQUENCE_FILE not implemented. </codeblock>
 
-    <p>
-      Inserting data into partitioned tables requires slightly different syntax that divides the partitioning
-      columns from the others:
-    </p>
-
-<codeblock>create table t1 (i int) <b>partitioned by (x int, y string)</b>;
--- Select an INT column from another table.
--- All inserted rows will have the same x and y values, as specified in the INSERT statement.
--- This technique of specifying all the partition key values is known as static partitioning.
-insert into t1 <b>partition(x=10, y='a')</b> select c1 from some_other_table;
--- Select two INT columns from another table.
--- All inserted rows will have the same y value, as specified in the INSERT statement.
--- Values from c2 go into t1.x.
--- Any partitioning columns whose value is not specified are filled in
--- from the columns specified last in the SELECT list.
--- This technique of omitting some partition key values is known as dynamic partitioning.
-insert into t1 <b>partition(x, y='b')</b> select c1, c2 from some_other_table;
--- Select an INT and a STRING column from another table.
--- All inserted rows will have the same x value, as specified in the INSERT statement.
--- Values from c3 go into t1.y.
-insert into t1 <b>partition(x=20, y)</b> select c1, c3  from some_other_table;</codeblock>
-
     <p rev="1.1">
       The following examples show how you can copy the data in all the columns from one table to another, copy the
       data from only some columns, or specify the columns in the select list in a different order than they
@@ -434,42 +389,6 @@ insert into t2 (c2, c1) select c1, c2 from t1;
 -- But the number and type of selected columns must match the columns mentioned in the () part.
 alter table t2 replace columns (x int, y int);
 insert into t2 (y) select c1 from t1;
-
--- For partitioned tables, all the partitioning columns must be mentioned in the () column list
--- or a PARTITION clause; these columns cannot be defaulted to NULL.
-create table pt1 (x int, y int) partitioned by (z int);
--- The values from c1 are copied into the column x in the new table,
--- all in the same partition based on a constant value for z.
--- The values of y in the new table are all NULL.
-insert into pt1 (x) partition (z=5) select c1 from t1;
--- Again we omit the values for column y so they are all NULL.
--- The inserted x values can go into different partitions, based on
--- the different values inserted into the partitioning column z.
-insert into pt1 (x,z) select x, z from t2;
-</codeblock>
-
-    <p>
-      <codeph>SELECT *</codeph> for a partitioned table requires that all partition key columns in the source table
-      be declared as the last columns in the <codeph>CREATE TABLE</codeph> statement. You still include a
-      <codeph>PARTITION BY</codeph> clause listing all the partition key columns. These partition columns are
-      automatically mapped to the last columns from the <codeph>SELECT *</codeph> list.
-    </p>
-
-<codeblock>create table source (x int, y int, year int, month int, day int);
-create table destination (x int, y int) partitioned by (year int, month int, day int);
-...load some data into the unpartitioned source table...
--- Insert a single partition of data.
--- The SELECT * means you cannot specify partition (year=2014, month, day).
-insert overwrite destination partition (year, month, day) select * from source where year=2014;
--- Insert the data for all year/month/day combinations.
-insert overwrite destination partition (year, month, day) select * from source;
-
--- If one of the partition columns is omitted from the source table,
--- then you can specify a specific value for that column in the PARTITION clause.
--- Here the source table holds only data from 2014, and so does not include a year column.
-create table source_2014 (x int, y int, month, day);
-...load some data into the unpartitioned source_2014 table...
-insert overwrite destination partition (year=2014, month, day) select * from source_2014;
 </codeblock>
 
     <p conref="../shared/impala_common.xml#common/insert_sort_blurb"/>
@@ -741,7 +660,177 @@ Inserted 2 rows in 0.16s
       <p rev="1.3.1" conref="../shared/impala_common.xml#common/insert_inherit_permissions"/>
     </conbody>
   </concept>
+  <concept id="partition_insert">
+    <title>Inserting Into Partitioned Tables with PARTITION Clause</title>
+    <conbody>
+      <p>
+        For a partitioned table, the optional <codeph>PARTITION</codeph> clause
+        identifies which partition or partitions the values are inserted
+        into.
+      </p>
+      <p>
+        All examples in this section will use the table declared as below:
+      </p>
+<codeblock>CREATE TABLE t1 (w INT) PARTITIONED BY (x INT, y STRING);</codeblock>
+    </conbody>
 
-<!-- Values clause -->
-</concept>
-<!-- INSERT statement -->
+    <concept id="static_partition_insert">
+      <title>Static Partition Inserts</title>
+      <conbody>
+        <p>
+          In a static partition insert where a partition key column is given a
+          constant value, such as <codeph>PARTITION</codeph>
+          <codeph>(year=2012, month=2)</codeph>, the rows are inserted with the
+          same values specified for those partition key columns.
+        </p>
+        <p>
+          The number of columns in the <codeph>SELECT</codeph> list must equal
+          the number of columns in the column permutation.
+        </p>
+        <p>
+          The <codeph>PARTITION</codeph> clause must be used for static
+          partitioning inserts.
+        </p>
+        <p>
+          Example:
+        </p>
+        <p>
+          The following statement will insert the
+            <codeph>some_other_table.c1</codeph> values for the
+            <codeph>w</codeph> column, and all the rows inserted will have the
+          same <codeph>x</codeph> value of <codeph>10</codeph>, and the same
+            <codeph>y</codeph> value of
+          <codeph>‘a’</codeph>.<codeblock>INSERT INTO t1 PARTITION (x=10, y='a')
+            SELECT c1 FROM some_other_table;</codeblock>
+        </p>
+      </conbody>
+    </concept>
+    <concept id="dynamic_partition_insert">
+        <title>Dynamic Partition Inserts</title>
+        <conbody>
+          <p>
+            In a dynamic partition insert where a partition key
+          column is in the <codeph>INSERT</codeph> statement but not assigned a
+          value, such as in <codeph>PARTITION (year, region)</codeph>(both
+          columns unassigned) or <codeph>PARTITION(year, region='CA')</codeph>
+            (<codeph>year</codeph> column unassigned), the unassigned columns
+          are filled in with the final columns of the <codeph>SELECT</codeph> or
+            <codeph>VALUES</codeph> clause. In this case, the number of columns
+          in the <codeph>SELECT</codeph> list must equal the number of columns
+          in the column permutation plus the number of partition key columns not
+          assigned a constant value.
+          </p>
+          <p>
+            See <xref
+              href="https://www.cloudera.com/documentation/enterprise/latest/topics/impala_partitioning.html#partition_static_dynamic"
+              format="html" scope="external"><u>Static and Dynamic Partitioning
+                Clauses</u></xref> for examples and performance characteristics
+            of static and dynamic partitioned inserts.
+          </p>
+          <p>
+            The following rules apply to dynamic partition
+            inserts.
+          </p>
+          <ul>
+            <li>
+              <p>
+                The columns are bound in the order they appear in the
+                  <codeph>INSERT</codeph> statement.
+              </p>
+              <p>
+                The table below shows the values inserted with the
+                <codeph>INSERT</codeph> statements of different column
+              orders.
+              </p>
+            </li>
+          </ul>
+          <table id="table_vyx_dp3_ldb" colsep="1" rowsep="1" frame="all">
+            <tgroup cols="4" align="left">
+              <colspec colnum="1" colname="col1"/>
+              <colspec colnum="2" colname="col2"/>
+              <colspec colnum="3" colname="col3"/>
+              <colspec colnum="4" colname="col4"/>
+              <tbody>
+                <row>
+                  <entry/>
+                  <entry>Column <codeph>w</codeph> Value</entry>
+                  <entry>Column <codeph>x</codeph> Value</entry>
+                  <entry>Column <codeph>y</codeph> Value</entry>
+                </row>
+                <row>
+                  <entry><codeph>INSERT INTO t1 (w, x, y) VALUES (1, 2,
+                      'c');</codeph></entry>
+                  <entry><codeph>1</codeph></entry>
+                  <entry><codeph>2</codeph></entry>
+                  <entry><codeph>‘c’</codeph></entry>
+                </row>
+                <row>
+                  <entry><codeph>INSERT INTO t1 (x,w) PARTITION (y) VALUES (1,
+                      2, 'c');</codeph></entry>
+                  <entry><codeph>2</codeph></entry>
+                  <entry><codeph>1</codeph></entry>
+                  <entry><codeph>‘c’</codeph></entry>
+                </row>
+              </tbody>
+            </tgroup>
+          </table>
+          <ul>
+            <li>
+              When a partition clause is specified but the non-partition
+            columns are not specified in the <codeph>INSERT</codeph> statement,
+            as in the first example below, the non-partition columns are treated
+            as though they had been specified before the
+              <codeph>PARTITION</codeph> clause in the SQL.
+              <p>
+                Example: These
+              three statements are equivalent, inserting <codeph>1</codeph> to
+                <codeph>w</codeph>, <codeph>2</codeph> to <codeph>x</codeph>,
+              and <codeph>‘c’</codeph> to <codeph>y</codeph>
+            columns.
+              </p>
+<codeblock>INSERT INTO t1 PARTITION (x,y) VALUES (1, 2, ‘c’);
+INSERT INTO t1 (w) PARTITION (x, y) VALUES (1, 2, ‘c’);
+INSERT INTO t1 PARTITION (x, y='c') VALUES (1, 2);</codeblock>
+            </li>
+            <li>
+              The <codeph>PARTITION</codeph> clause is not required for
+            dynamic partition, but all the partition columns must be explicitly
+            present in the <codeph>INSERT</codeph> statement in the column list
+            or in the <codeph>PARTITION</codeph> clause. The partition columns
+            cannot be defaulted to <codeph>NULL</codeph>.
+              <p>
+                Example:
+              </p>
+              <p>The following statements are valid because the partition
+              columns, <codeph>x</codeph> and <codeph>y</codeph>, are present in
+              the <codeph>INSERT</codeph> statements, either in the
+                <codeph>PARTITION</codeph> clause or in the column
+              list.
+              </p>
+<codeblock>INSERT INTO t1 PARTITION (x,y) VALUES (1, 2, ‘c’);
+INSERT INTO t1 (w, x) PARTITION (y) VALUES (1, 2, ‘c’);</codeblock>
+              <p>
+                The following statement is not valid for the partitioned table as
+              defined above because the partition columns, <codeph>x</codeph>
+              and <codeph>y</codeph>, are not present in the
+                <codeph>INSERT</codeph> statement.
+              </p>
+<codeblock>INSERT INTO t1 VALUES (1, 2, 'c');</codeblock>
+          </li>
+            <li>
+              If partition columns do not exist in the source table, you can
+              specify a specific value for that column in the
+              <codeph>PARTITION</codeph> clause.
+              <p>
+                Example: The <codeph>source</codeph> table only contains the column
+                <codeph>w</codeph> and <codeph>y</codeph>. The value,
+                <codeph>20</codeph>, specified in the <codeph>PARTITION</codeph>
+              clause, is inserted into the <codeph>x</codeph> column.
+              </p>
+<codeblock>INSERT INTO t1 PARTITION (x=20, y) SELECT * FROM source;</codeblock>
+          </li>
+          </ul>
+        </conbody>
+      </concept>
+    </concept>
+  </concept>


[07/11] impala git commit: Move some test_spilling debug actions to exhaustive

Posted by bh...@apache.org.
Move some test_spilling debug actions to exhaustive

Phil Z pointed out that these tests take a long time to run.
Based on experience, we only need the always/never variants
to catch the vast majority of spilling-related bugs, so
relegating the other variants to exhaustive should not cause any major
problems.

People making spilling-related changes to the spilling code should run
this test with the exhaustive dimension to flush out any problems in
advance, e.g.:

  impala-py.test tests/query_test/test_spilling.py -n2 --verbose \
    --workload_exploration_strategy=functional-query:exhaustive

Change-Id: I9ea9f6c299480f8dfc943635342e4473499cc8ad
Reviewed-on: http://gerrit.cloudera.org:8080/9976
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/08a01b9f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/08a01b9f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/08a01b9f

Branch: refs/heads/2.x
Commit: 08a01b9fc18687e04502ed2ac04956140c18262e
Parents: 4510512
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Apr 10 12:01:55 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 tests/query_test/test_spilling.py | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/08a01b9f/tests/query_test/test_spilling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_spilling.py b/tests/query_test/test_spilling.py
index 0b36429..8550f96 100644
--- a/tests/query_test/test_spilling.py
+++ b/tests/query_test/test_spilling.py
@@ -22,11 +22,17 @@ from tests.common.test_dimensions import (create_exec_option_dimension_from_dict
     create_parquet_dimension)
 
 # Test with denial of reservations at varying frequency.
-DEBUG_ACTION_DIMS = [None,
+# Always test with the minimal amount of spilling and running with the absolute minimum
+# memory requirement.
+CORE_DEBUG_ACTION_DIMS = [None,
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0']
+
+# Test with different frequency of denial on exhaustive to try and exercise more
+# interesting code paths.
+EXHAUSTIVE_DEBUG_ACTION_DIMS = [
   '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.1',
   '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.5',
-  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.9',
-  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0']
+  '-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@0.9']
 
 @pytest.mark.xfail(pytest.config.option.testing_remote_cluster,
                    reason='Queries may not spill on larger clusters')
@@ -40,10 +46,13 @@ class TestSpillingDebugActionDimensions(ImpalaTestSuite):
     super(TestSpillingDebugActionDimensions, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.clear_constraints()
     cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension('tpch'))
+    debug_action_dims = CORE_DEBUG_ACTION_DIMS
+    if cls.exploration_strategy() == 'exhaustive':
+      debug_action_dims = CORE_DEBUG_ACTION_DIMS + EXHAUSTIVE_DEBUG_ACTION_DIMS
     # Tests are calibrated so that they can execute and spill with this page size.
     cls.ImpalaTestMatrix.add_dimension(
         create_exec_option_dimension_from_dict({'default_spillable_buffer_size' : ['256k'],
-          'debug_action' : DEBUG_ACTION_DIMS}))
+          'debug_action' : debug_action_dims}))
 
   def test_spilling(self, vector):
     self.run_test_case('QueryTest/spilling', vector)


[06/11] impala git commit: IMPALA-6831: [DOCS] Remove the note about TLS1.2 on RHEL 6 and CentOS 6

Posted by bh...@apache.org.
IMPALA-6831: [DOCS] Remove the note about TLS1.2 on RHEL 6 and CentOS 6

Change-Id: I710dd809e6a3f8dee89f4ed8997bba0c2aa2c783
Reviewed-on: http://gerrit.cloudera.org:8080/9983
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c609fa83
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c609fa83
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c609fa83

Branch: refs/heads/2.x
Commit: c609fa83be710063dc335936d0e82ebd9a6e8629
Parents: 23743ba
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Tue Apr 10 16:26:50 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Apr 13 03:26:26 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_ssl.xml | 10 ----------
 1 file changed, 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c609fa83/docs/topics/impala_ssl.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_ssl.xml b/docs/topics/impala_ssl.xml
index 9feb758..1548892 100644
--- a/docs/topics/impala_ssl.xml
+++ b/docs/topics/impala_ssl.xml
@@ -171,16 +171,6 @@ under the License.
           </p>
         </li>
       </ul>
-      <note>
-        <p>
-          As of <keyword keyref="impala210_full"/>, TLSv1.2 may not work for Impala on RHEL 6
-          or CentOS 6, even if OpenSSL 1.0.1 is available. The daemons fail to start, with a
-          socket error stating the TLS version is not supported. The underlying cause is related to
-          <xref href="https://bugzilla.redhat.com/show_bug.cgi?id=1497859" scope="external" format="html">Red Hat issue 1497859</xref>.
-          The issue applies if you build on a RHEL 6 or CentOS 6 system with OpenSSL 1.0.0, and
-          run on a RHEL 6 or CentOS 6 system with OpenSSL 1.0.1.
-        </p>
-      </note>
 
       <p>
         Along with specifying the version, you can also specify the allowed set of TLS ciphers