You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/07 01:31:37 UTC

[1/3] impala git commit: IMPALA-6625: Skip computing parquet conjuncts for non-Parquet scans

Repository: impala
Updated Branches:
  refs/heads/master fd0ba0fd2 -> 30d196fd5


IMPALA-6625: Skip computing parquet conjuncts for non-Parquet scans

This change ensures that the planner computes parquet conjuncts
only when for scans containing parquet files. Additionally, it
also handles PARQUET_DICTIONARY_FILTERING and
PARQUET_READ_STATISTICS query options in the planner.

Testing was carried out independently on parquet and non-parquet
scans:
  1. Parquet scans were tested via the existing parquet-filtering
     planner test. Additionally, a new test
     [parquet-filtering-disabled] was added to ensure that the
     explain plan generated skips parquet predicates based on the
     query options.
  2. Non-parquet scans were tested manually to ensure that the
     functions to compute parquet conjucts were not invoked.
     Additional test cases were added to the parquet-filtering
     planner test to scan non parquet tables and ensure that the
     plans do not contain conjuncts based on parquet statistics.
  3. A parquet partition was added to the alltypesmixedformat
     table in the functional database. Planner tests were added
     to ensure that Parquet conjuncts are constructed only when
     the Parquet partition is included in the query.

Change-Id: I9d6c26d42db090c8a15c602f6419ad6399c329e7
Reviewed-on: http://gerrit.cloudera.org:8080/10704
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c6f9b61e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c6f9b61e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c6f9b61e

Branch: refs/heads/master
Commit: c6f9b61ec2277e1ac16783ee345ad715f30118b9
Parents: fd0ba0f
Author: poojanilangekar <po...@cloudera.com>
Authored: Mon Jun 11 17:20:40 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 6 02:06:50 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/FeCatalogUtils.java   |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  12 +-
 .../org/apache/impala/planner/PlannerTest.java  |   9 +
 testdata/bin/create-load-data.sh                |   8 +-
 testdata/bin/load-dependent-tables.sql          |   6 +
 .../queries/PlannerTest/constant-folding.test   |   2 -
 .../PlannerTest/fk-pk-join-detection.test       |   6 -
 .../PlannerTest/parquet-filtering-disabled.test | 311 +++++++++++++++++++
 .../queries/PlannerTest/parquet-filtering.test  | 120 +++++++
 .../PlannerTest/resource-requirements.test      |  30 +-
 .../queries/PlannerTest/tablesample.test        |   1 -
 .../queries/QueryTest/mixed-format.test         |   2 +-
 .../queries/QueryTest/show-stats.test           |   3 +-
 tests/query_test/test_rows_availability.py      |   5 +-
 14 files changed, 483 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index bf4d230..dc16629 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -268,7 +268,7 @@ public abstract class FeCatalogUtils {
    */
   public static HdfsFileFormat getMajorityFormat(
       Iterable<? extends FeFsPartition> partitions) {
-    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
+    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newTreeMap();
     for (FeFsPartition partition: partitions) {
       HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
       Integer numPartitions = numPartitionsByFormat.get(format);

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index d1d7fd8..ac6c85a 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -357,7 +357,6 @@ public class HdfsScanNode extends ScanNode {
     checkForSupportedFileFormats();
 
     assignCollectionConjuncts(analyzer);
-    computeDictionaryFilterConjuncts(analyzer);
 
     // compute scan range locations with optional sampling
     computeScanRangeLocations(analyzer);
@@ -376,7 +375,16 @@ public class HdfsScanNode extends ScanNode {
     }
 
     if (fileFormats_.contains(HdfsFileFormat.PARQUET)) {
-      computeMinMaxTupleAndConjuncts(analyzer);
+      // Compute min-max conjuncts only if the PARQUET_READ_STATISTICS query option is
+      // set to true.
+      if (analyzer.getQueryOptions().parquet_read_statistics) {
+        computeMinMaxTupleAndConjuncts(analyzer);
+      }
+      // Compute dictionary conjuncts only if the PARQUET_DICTIONARY_FILTERING query
+      // option is set to true.
+      if (analyzer.getQueryOptions().parquet_dictionary_filtering) {
+        computeDictionaryFilterConjuncts(analyzer);
+      }
     }
 
     if (canApplyParquetCountStarOptimization(analyzer, fileFormats_)) {

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 2667e74..063169d 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -321,6 +321,15 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testParquetFilteringDisabled() {
+    TQueryOptions options = new TQueryOptions();
+    options.setParquet_dictionary_filtering(false);
+    options.setParquet_read_statistics(false);
+    runPlannerTestFile("parquet-filtering-disabled", options,
+        ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN));
+  }
+
+  @Test
   public void testKudu() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
     addTestDb("kudu_planner_test", "Test DB for Kudu Planner.");

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index bb95f48..1953daf 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -309,19 +309,21 @@ function copy-and-load-dependent-tables {
   # temporary location for that table to use. Should find a better way to handle this.
   echo COPYING AND LOADING DATA FOR DEPENDENT TABLES
   hadoop fs -rm -r -f /test-warehouse/alltypesmixedformat \
-    /tmp/alltypes_rc /tmp/alltypes_seq
+    /tmp/alltypes_rc /tmp/alltypes_seq /tmp/alltypes_parquet
   hadoop fs -mkdir -p /tmp/alltypes_seq/year=2009 \
-    /tmp/alltypes_rc/year=2009
+    /tmp/alltypes_rc/year=2009 /tmp/alltypes_parquet/year=2009
 
   # The file written by hive to /test-warehouse will be strangely replicated rather than
   # erasure coded if EC is not set in /tmp
   if [[ -n "${HDFS_ERASURECODE_POLICY:-}" ]]; then
     hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" -path "/tmp/alltypes_rc"
     hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" -path "/tmp/alltypes_seq"
+    hdfs ec -setPolicy -policy "${HDFS_ERASURECODE_POLICY}" -path "/tmp/alltypes_parquet"
   fi
 
   hadoop fs -cp /test-warehouse/alltypes_seq/year=2009/month=2/ /tmp/alltypes_seq/year=2009
   hadoop fs -cp /test-warehouse/alltypes_rc/year=2009/month=3/ /tmp/alltypes_rc/year=2009
+  hadoop fs -cp /test-warehouse/alltypes_parquet/year=2009/month=4/ /tmp/alltypes_parquet/year=2009
 
   # Create a hidden file in AllTypesSmall
   hadoop fs -cp -f /test-warehouse/zipcode_incomes/DEC_00_SF3_P077_with_ann_noheader.csv \
@@ -345,7 +347,7 @@ function copy-and-load-dependent-tables {
   #
   # See: logs/data_loading/copy-and-load-dependent-tables.log)
   # See also: IMPALA-4345
-  hadoop fs -chmod -R 777 /tmp/alltypes_rc /tmp/alltypes_seq
+  hadoop fs -chmod -R 777 /tmp/alltypes_rc /tmp/alltypes_seq /tmp/alltypes_parquet
 
   # For tables that rely on loading data from local fs test-wareload-house
   # TODO: Find a good way to integrate this with the normal data loading scripts

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/bin/load-dependent-tables.sql
----------------------------------------------------------------------
diff --git a/testdata/bin/load-dependent-tables.sql b/testdata/bin/load-dependent-tables.sql
index 9de462f..e94def7 100644
--- a/testdata/bin/load-dependent-tables.sql
+++ b/testdata/bin/load-dependent-tables.sql
@@ -55,6 +55,10 @@ ALTER TABLE alltypesmixedformat SET FILEFORMAT RCFILE;
 LOAD DATA INPATH '/tmp/alltypes_rc/year=2009/month=3/'
 OVERWRITE INTO TABLE alltypesmixedformat PARTITION (year=2009, month=3);
 
+ALTER TABLE alltypesmixedformat SET FILEFORMAT PARQUET;
+LOAD DATA INPATH '/tmp/alltypes_parquet/year=2009/month=4'
+OVERWRITE INTO TABLE alltypesmixedformat PARTITION (year=2009, month=4);
+
 ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=1)
   SET SERDEPROPERTIES('field.delim'=',', 'escape.delim'='\\');
 ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=1)
@@ -65,6 +69,8 @@ ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=2)
   SET FILEFORMAT SEQUENCEFILE;
 ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=3)
   SET FILEFORMAT RCFILE;
+ALTER TABLE alltypesmixedformat PARTITION (year=2009, month=4)
+  SET FILEFORMAT PARQUET;
 
 DROP TABLE IF EXISTS functional_parquet.chars_formats;
 CREATE EXTERNAL TABLE functional_parquet.chars_formats

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 9552caf..8e73f89 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -156,7 +156,6 @@ PLAN-ROOT SINK
 |       partitions: 24/24 rows=7300
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=310
-|     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
 |     tuple-ids=1 row-size=20B cardinality=730
 |
@@ -197,7 +196,6 @@ PLAN-ROOT SINK
 |       partitions: 24/24 rows=7300
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=310
-|     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
 |     tuple-ids=1 row-size=20B cardinality=730
 |

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
index 1a2bfe8..fdd6792 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/fk-pk-join-detection.test
@@ -23,7 +23,6 @@ PLAN-ROOT SINK
 |       table: rows=100000 size=12.60MB
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=100000
-|     parquet dictionary predicates: c_salutation = 'Mrs.'
 |     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=255B cardinality=16667
 |
@@ -64,7 +63,6 @@ PLAN-ROOT SINK
 |       table: rows=100000 size=12.60MB
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=100000
-|     parquet dictionary predicates: c_salutation = 'Mrs.'
 |     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=255B cardinality=16667
 |
@@ -104,7 +102,6 @@ PLAN-ROOT SINK
 |       table: rows=100000 size=12.60MB
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=100000
-|     parquet dictionary predicates: c_salutation = 'Mrs.'
 |     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=255B cardinality=16667
 |
@@ -144,7 +141,6 @@ PLAN-ROOT SINK
 |       table: rows=287514 size=31.19MB
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=287514
-|     parquet dictionary predicates: sr_return_quantity < 10
 |     mem-estimate=80.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=1 row-size=88B cardinality=28751
 |
@@ -221,7 +217,6 @@ PLAN-ROOT SINK
 |       table: rows=73049 size=9.84MB
 |       columns: all
 |     extrapolated-rows=disabled max-scan-range-rows=73049
-|     parquet dictionary predicates: a.d_holiday = 'Y'
 |     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
 |     tuple-ids=0 row-size=303B cardinality=36525
 |
@@ -303,7 +298,6 @@ PLAN-ROOT SINK
 |  |       table: rows=73049 size=9.84MB
 |  |       columns: all
 |  |     extrapolated-rows=disabled max-scan-range-rows=73049
-|  |     parquet dictionary predicates: d1.d_fy_week_seq = 1000
 |  |     mem-estimate=48.00MB mem-reservation=8.00MB thread-reservation=1
 |  |     tuple-ids=3 row-size=8B cardinality=7
 |  |

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
new file mode 100644
index 0000000..4cccd06
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -0,0 +1,311 @@
+# All queries in this test are run with PARQUET_DICTIONARY_FILTERING and
+# PARQUET_READ_STATISTICS disabled. The expected behavior is for the planner to skip
+# assigning statistics and dictionary conjuncts while querying parquet files.
+# Parquet predicates to be skipped:
+# parquet statistics predicate on int_col
+# parquet dictionary predicate on int_col
+select count(*) from functional_parquet.alltypes
+where int_col > 1 and int_col * rand() > 50 and int_col is null
+and int_col > tinyint_col;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=42.00MB mem-reservation=16.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=188.29KB
+   predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
+   tuple-ids=0 row-size=5B cardinality=unavailable
+====
+# Parquet predicates to be skipped:
+# parquet statistics predicate on bigint_col, double_col, float_col, id, tinyint_col,
+#                                 string_col, smallint_col & date_string_col
+# parquet dictionary predicate on bool_col, bigint_col, double_col, float_col, id,
+#                                 tinyint_col, string_col, smallint_col, int_col &
+#                                 date_string_col
+select count(*) from functional_parquet.alltypes
+where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
+and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00
+and double_col > 100.00 and date_string_col > '1993-10-01'
+and string_col in ('aaaa', 'bbbb', 'cccc')
+and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
+and year > 2000 and month < 12;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=88.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=22/24 files=22 size=172.28KB
+   predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/22 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
+   tuple-ids=0 row-size=80B cardinality=unavailable
+====
+# Parquet predicates to be skipped:
+# parquet dictionary predicates on id, string_col & int_col
+select count(*) from functional_parquet.alltypes
+where id NOT IN (0,1,2) and string_col IN ('aaaa', 'bbbb', 'cccc', NULL)
+and mod(int_col,50) IN (0,1)
+and id IN (int_col);
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=58.00MB mem-reservation=24.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional_parquet.alltypes]
+   partitions=24/24 files=24 size=188.29KB
+   predicates: id IN (int_col), id NOT IN (0, 1, 2), string_col IN ('aaaa', 'bbbb', 'cccc', NULL), mod(int_col, 50) IN (0, 1)
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns: unavailable
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====
+# Nested parquet predicates to be skipped:
+# parquet statistics predicates on a.item.e
+# parquet dictionary predicates on a.item.e
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d cn, cn.item a
+where a.item.e < -10;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=2,1,0 row-size=44B cardinality=unavailable
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1,0 row-size=44B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1 row-size=20B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=16B mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=2,1 row-size=20B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  |     tuple-ids=1 row-size=16B cardinality=1
+|  |  |
+|  |  06:UNNEST [cn.item a]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.nested_struct.c.d cn]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.complextypestbl c]
+   partitions=1/1 files=2 size=6.92KB
+   predicates: !empty(c.nested_struct.c.d)
+   predicates on cn: !empty(cn.item)
+   predicates on a: a.item.e < -10
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     columns missing stats: id
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====
+# Parquet predicates to be skipped at each level:
+# parquet statistics predicates on c_custkey, o.o_orderkey & l.l_partkey
+# parquet dictionary predicates on c_custkey, o.o_orderkey & l.l_partkey 
+select c_custkey from tpch_nested_parquet.customer c, c.c_orders o,
+o.o_lineitems l where c_custkey > 0 and o.o_orderkey > 0 and l.l_partkey > 0;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=264.00MB mem-reservation=16.00MB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=2,1,0 row-size=56B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1,0 row-size=56B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1 row-size=32B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=24B mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=2,1 row-size=32B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  |     tuple-ids=1 row-size=24B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems l]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=288.98MB
+   predicates: c_custkey > 0, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o.o_orderkey > 0
+   predicates on l: l.l_partkey > 0
+   stored statistics:
+     table: rows=150000 size=288.98MB
+     columns missing stats: c_orders
+   extrapolated-rows=disabled max-scan-range-rows=44229
+   mem-estimate=264.00MB mem-reservation=16.00MB thread-reservation=1
+   tuple-ids=0 row-size=24B cardinality=15000
+====
+# Parquet filtering to be skipped on multiple collections at the same nested level:
+# parquet statistics filtering on l.l_shipdate, l.l_receiptdate, l.l_shipmode
+#                                 & l.l_returnflag
+# parquet dictionary filtering on l.l_shipmode, l.l_receiptdate, l.l_shipmode
+#                                 & l.l_returnflag
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_shipdate = '1994-08-19' and
+l.l_receiptdate = '1994-08-24' and l.l_shipmode = 'RAIL' and l.l_returnflag = 'R' and
+l.l_comment is null;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=616.00MB mem-reservation=32.00MB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  tuple-ids=2,1,0 row-size=162B cardinality=15000000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=50B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1,0 row-size=162B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=0 row-size=50B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2,1 row-size=112B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=32B mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=2,1 row-size=112B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |  |     tuple-ids=1 row-size=32B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems l]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B thread-reservation=0
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=288.98MB
+   predicates: !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems)
+   predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R', l.l_comment IS NULL
+   stored statistics:
+     table: rows=150000 size=288.98MB
+     columns missing stats: c_orders
+   extrapolated-rows=disabled max-scan-range-rows=44229
+   mem-estimate=616.00MB mem-reservation=32.00MB thread-reservation=1
+   tuple-ids=0 row-size=50B cardinality=150000
+====
+# Parquet filtering to be skipped on a mixed file format table:
+# parquet statistics predicates on bigint_col, double_col, float_col, id, tinyint_col,
+#                                  string_col, smallint_col & date_string_col
+# parquet dictionary predicates on bool_col, bigint_col, double_col, float_col, id,
+#                                  tinyint_col, string_col, smallint_col, int_col,
+#                                  timestamp_col & date_string_col
+select count(*) from functional.alltypesmixedformat
+where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
+and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00
+and double_col > 100.00 and date_string_col > '1993-10-01'
+and string_col in ('aaaa', 'bbbb', 'cccc')
+and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
+and year > 2000 and month < 12;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=42.00MB mem-reservation=88.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional.alltypesmixedformat]
+   partitions=4/4 files=4 size=66.61KB
+   predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   tuple-ids=0 row-size=80B cardinality=unavailable
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 0987336..8df8716 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -30,6 +30,32 @@ PLAN-ROOT SINK
    mem-estimate=32.00MB mem-reservation=16.00KB thread-reservation=1
    tuple-ids=0 row-size=5B cardinality=unavailable
 ====
+# Test non-parquet types to ensure that parquet predicates are skipped
+select count(*) from functional.alltypes
+where int_col > 1 and int_col * rand() > 50 and int_col is null
+and int_col > tinyint_col;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=32.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col IS NULL, int_col > 1, int_col > tinyint_col, int_col * rand() > 50
+   stored statistics:
+     table: rows=7300 size=478.45KB
+     partitions: 24/24 rows=7300
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=310
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
+   tuple-ids=0 row-size=5B cardinality=730
+====
 # Test a variety of types
 select count(*) from functional_parquet.alltypes
 where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
@@ -62,6 +88,36 @@ PLAN-ROOT SINK
    mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====
+# Test non-parquet files for a variety of predicates
+select count(*) from functional.alltypes
+where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
+and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00
+and double_col > 100.00 and date_string_col > '1993-10-01'
+and string_col in ('aaaa', 'bbbb', 'cccc')
+and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
+and year > 2000 and month < 12;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=32.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=22/24 files=22 size=437.72KB
+   predicates: id = 1, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, tinyint_col < 50, mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   stored statistics:
+     table: rows=7300 size=478.45KB
+     partitions: 22/22 rows=6680
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=339
+   mem-estimate=128.00MB mem-reservation=32.00KB thread-reservation=1
+   tuple-ids=0 row-size=89B cardinality=1
+====
 # Test negative cases for IN predicate min/max filtering
 #  - NOT IN
 #  - IN list with NULL
@@ -442,3 +498,67 @@ PLAN-ROOT SINK
    mem-estimate=616.00MB mem-reservation=32.00MB thread-reservation=1
    tuple-ids=0 row-size=50B cardinality=150000
 ====
+# Test a variety of predicates on a mixed format table.
+# Scan multiple partitions with atleast one Parquet partition.
+select count(*) from functional.alltypesmixedformat
+where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
+and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00
+and double_col > 100.00 and date_string_col > '1993-10-01'
+and string_col in ('aaaa', 'bbbb', 'cccc')
+and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
+and year > 2000 and month < 12;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=42.00MB mem-reservation=88.00KB thread-reservation=2
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|
+00:SCAN HDFS [functional.alltypesmixedformat]
+   partitions=4/4 files=4 size=66.61KB
+   predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/4 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01'
+   parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   tuple-ids=0 row-size=80B cardinality=unavailable
+====
+# Test a variety of predicates on a mixed format table.
+# Scan all partitions other than the parquet partition.
+select count(*) from functional.alltypesmixedformat
+where id = 1 and bool_col and tinyint_col < 50 and smallint_col in (1,2,3,4,5)
+and mod(int_col,2) = 1 and bigint_col < 5000 and float_col > 50.00
+and double_col > 100.00 and date_string_col > '1993-10-01'
+and string_col in ('aaaa', 'bbbb', 'cccc')
+and timestamp_cmp(timestamp_col, '2016-11-20 00:00:00') = 1
+and year != 2009 and month != 4;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=10.00MB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=0
+|
+00:SCAN HDFS [functional.alltypesmixedformat]
+   partitions=0/4 files=0 size=0B
+   predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/0 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=0
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=80B cardinality=0
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 59c7fa2..ee9a790 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1255,27 +1255,27 @@ PLAN-ROOT SINK
 # Mixed table format scan
 select * from functional.alltypesmixedformat
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=32.00KB Threads=2
+Max Per-Host Resource Reservation: Memory=88.00KB Threads=2
 Per-Host Resource Estimates: Memory=16.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypesmixedformat
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=2
+|  Per-Host Resources: mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [functional.alltypesmixedformat]
-   partitions=3/3 files=3 size=58.17KB
+   partitions=4/4 files=4 size=66.61KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/3 rows=unavailable
+     partitions: 0/4 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
+   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=32.00KB Threads=3
+Max Per-Host Resource Reservation: Memory=88.00KB Threads=3
 Per-Host Resource Estimates: Memory=16.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypesmixedformat
@@ -1290,18 +1290,18 @@ PLAN-ROOT SINK
 |  tuple-ids=0 row-size=88B cardinality=unavailable
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
-Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=2
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=2
 00:SCAN HDFS [functional.alltypesmixedformat, RANDOM]
-   partitions=3/3 files=3 size=58.17KB
+   partitions=4/4 files=4 size=66.61KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/3 rows=unavailable
+     partitions: 0/4 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
+   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=64.00KB Threads=5
+Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
 Per-Host Resource Estimates: Memory=32.00MB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional.alltypesmixedformat
@@ -1316,15 +1316,15 @@ PLAN-ROOT SINK
 |  tuple-ids=0 row-size=88B cardinality=unavailable
 |
 F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
-Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservation=4
+Per-Host Resources: mem-estimate=32.00MB mem-reservation=176.00KB thread-reservation=4
 00:SCAN HDFS [functional.alltypesmixedformat, RANDOM]
-   partitions=3/3 files=3 size=58.17KB
+   partitions=4/4 files=4 size=66.61KB
    stored statistics:
      table: rows=unavailable size=unavailable
-     partitions: 0/3 rows=unavailable
+     partitions: 0/4 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
+   mem-estimate=16.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=88B cardinality=unavailable
 ====
 # HBase scan

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index c63e985..4469598 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -52,7 +52,6 @@ PLAN-ROOT SINK
      partitions: 24/24 rows=7300
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=621
-   parquet dictionary predicates: id < 10
    mem-estimate=80.00MB mem-reservation=32.00KB thread-reservation=1
    tuple-ids=0 row-size=97B cardinality=365
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
index cd48e8c..0b693e1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
@@ -6,7 +6,7 @@ select count(*), sum(int_col) from functional.alltypesmixedformat
 ---- TYPES
 bigint, bigint
 ---- RESULTS
-900,4050
+1200,5400
 ====
 ---- QUERY
 # Restrict set of partitions (still multi-format)

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
index 11f8264..ca25047 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
@@ -86,7 +86,8 @@ YEAR, MONTH, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCRE
 '2009','1',-1,1,regex:.+KB,'NOT CACHED','NOT CACHED','TEXT','false','$NAMENODE/test-warehouse/alltypesmixedformat/year=2009/month=1'
 '2009','2',-1,1,regex:.+KB,'NOT CACHED','NOT CACHED','SEQUENCE_FILE','false','$NAMENODE/test-warehouse/alltypesmixedformat/year=2009/month=2'
 '2009','3',-1,1,regex:.+KB,'NOT CACHED','NOT CACHED','RC_FILE','false','$NAMENODE/test-warehouse/alltypesmixedformat/year=2009/month=3'
-'Total','',-1,3,regex:.+KB,'0B','','','',''
+'2009','4',-1,1,regex:.+KB,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/alltypesmixedformat/year=2009/month=4'
+'Total','',-1,4,regex:.+KB,'0B','','','',''
 ---- TYPES
 STRING, STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/c6f9b61e/tests/query_test/test_rows_availability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_rows_availability.py b/tests/query_test/test_rows_availability.py
index d8db865..06b4d65 100644
--- a/tests/query_test/test_rows_availability.py
+++ b/tests/query_test/test_rows_availability.py
@@ -105,8 +105,9 @@ class TestRowsAvailability(ImpalaTestSuite):
 
   @staticmethod
   def __parse_time_ms(duration):
-    """Parses a duration string of the form 1h2h3m4s5.6ms into milliseconds."""
-    matches = re.findall(r'([0-9]+h)?([0-9]+m)?([0-9]+s)?([0-9]+(\.[0-9]+)?ms)?',
+    """Parses a duration string of the form 1h2h3m4s5.6ms7.8ns into milliseconds."""
+    matches = re.findall(r'([0-9]+h)?([0-9]+m)?([0-9]+s)?'\
+                         '([0-9]+(\.[0-9]+)?ms)?([0-9]+(\.[0-9]+)?ns)?',
                          duration)
     # Expect exactly two matches because all groups are optional in the regex.
     if matches is None or len(matches) != 2:


[2/3] impala git commit: Bump toolchain version, include libunwind

Posted by ta...@apache.org.
Bump toolchain version, include libunwind

Change-Id: I0b26f6a342dd7ba282c3f6c4de93745aff2dd095
Reviewed-on: http://gerrit.cloudera.org:8080/10755
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/837d3868
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/837d3868
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/837d3868

Branch: refs/heads/master
Commit: 837d386886759f7e884b145e74df83bfc013b2e2
Parents: c6f9b61
Author: Lars Volker <lv...@cloudera.com>
Authored: Mon Jun 4 15:14:52 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 6 22:06:03 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt                    |  8 +++++++
 be/CMakeLists.txt                 |  1 +
 bin/bootstrap_toolchain.py        |  2 +-
 bin/impala-config.sh              |  6 +++--
 cmake_modules/FindLibUnwind.cmake | 40 ++++++++++++++++++++++++++++++++++
 5 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/837d3868/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2b8d563..77bf210 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -88,6 +88,7 @@ set_dep_root(GLOG)
 set_dep_root(GPERFTOOLS)
 set_dep_root(GTEST)
 set_dep_root(LIBEV)
+set_dep_root(LIBUNWIND)
 set_dep_root(LLVM)
 set(LLVM_DEBUG_ROOT $ENV{IMPALA_TOOLCHAIN}/llvm-$ENV{IMPALA_LLVM_DEBUG_VERSION})
 set_dep_root(LZ4)
@@ -343,6 +344,13 @@ endif()
 
 ###################################################################
 
+## libunwind
+if (NOT APPLE)
+  find_package(LibUnwind REQUIRED)
+  include_directories(SYSTEM ${LIBUNWIND_INCLUDE_DIR})
+  IMPALA_ADD_THIRDPARTY_LIB(libunwind ${LIBUNWIND_INCLUDE_DIR} ${LIBUNWIND_STATIC_LIB} "")
+endif()
+
 # Required for KRPC_GENERATE, which converts protobuf to stubs.
 find_package(KRPC REQUIRED)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/837d3868/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index de94f5b..da78dad 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -443,6 +443,7 @@ set (IMPALA_DEPENDENCIES
   gflags
   krb5
   libev
+  libunwind
   pprof
   breakpad_client
   hdfs

http://git-wip-us.apache.org/repos/asf/impala/blob/837d3868/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index 4a6009e..d2a3e71 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -426,7 +426,7 @@ if __name__ == "__main__":
   # their download starts as soon as possible.
   packages = map(Package, ["llvm", "kudu",
       "avro", "binutils", "boost", "breakpad", "bzip2", "cctz", "cmake", "crcutil",
-      "flatbuffers", "gcc", "gflags", "glog", "gperftools", "gtest", "libev",
+      "flatbuffers", "gcc", "gflags", "glog", "gperftools", "gtest", "libev", "libunwind",
       "lz4", "openldap", "openssl", "orc", "protobuf",
       "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"])
   packages.insert(0, Package("llvm", "5.0.1-asserts"))

http://git-wip-us.apache.org/repos/asf/impala/blob/837d3868/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b62c6a6..771ca2b 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=107-764a0ddc79
+export IMPALA_TOOLCHAIN_BUILD_ID=137-93cacec18d
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -103,6 +103,8 @@ export IMPALA_GTEST_VERSION=1.6.0
 unset IMPALA_GTEST_URL
 export IMPALA_LIBEV_VERSION=4.20
 unset IMPALA_LIBEV_URL
+export IMPALA_LIBUNWIND_VERSION=1.3-rc1-p3
+unset IMPALA_LIBUNWIND_URL
 export IMPALA_LLVM_VERSION=5.0.1
 unset IMPALA_LLVM_URL
 export IMPALA_LLVM_ASAN_VERSION=5.0.1
@@ -120,7 +122,7 @@ export IMPALA_OPENSSL_VERSION=1.0.2l
 unset IMPALA_OPENSSL_URL
 export IMPALA_ORC_VERSION=1.4.3-p2
 unset IMPALA_ORC_URL
-export IMPALA_PROTOBUF_VERSION=2.6.1
+export IMPALA_PROTOBUF_VERSION=3.5.1
 unset IMPALA_PROTOBUF_URL
 export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801
 unset IMPALA_POSTGRES_JDBC_DRIVER_URL

http://git-wip-us.apache.org/repos/asf/impala/blob/837d3868/cmake_modules/FindLibUnwind.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindLibUnwind.cmake b/cmake_modules/FindLibUnwind.cmake
new file mode 100644
index 0000000..53f012c
--- /dev/null
+++ b/cmake_modules/FindLibUnwind.cmake
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# - Find libunwind (libunwind.h, libunwind.so)
+#
+# This module defines
+#  LIBUNWIND_INCLUDE_DIR, directory containing headers
+#  LIBUNWIND_SHARED_LIB, path to libunwind's shared library
+#  LIBUNWIND_STATIC_LIB, path to libunwind's static library
+
+find_path(LIBUNWIND_INCLUDE_DIR libunwind.h
+  ${LIBUNWIND_ROOT}/include
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(LIBUNWIND_SHARED_LIB unwind
+  ${LIBUNWIND_ROOT}/lib
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+find_library(LIBUNWIND_STATIC_LIB libunwind.a
+  ${LIBUNWIND_ROOT}/lib
+  NO_CMAKE_SYSTEM_PATH
+  NO_SYSTEM_ENVIRONMENT_PATH)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(LIBUNWIND REQUIRED_VARS
+  LIBUNWIND_SHARED_LIB LIBUNWIND_STATIC_LIB LIBUNWIND_INCLUDE_DIR)


[3/3] impala git commit: IMPALA-6642 (Part 2): clean up start-impala-cluster.py

Posted by ta...@apache.org.
IMPALA-6642 (Part 2): clean up start-impala-cluster.py

We clean up start-impala-cluster.py in general in this patch by using
logging instead of "print" and formatting strings using the format()
function. We make sure to include a timestamp in each log message in
order to make it easier to debug failures in custom cluster tests that
happen when starting the cluster.

Change-Id: I60169203c61ae6bc0a3ccd3dea355799b603efe5
Reviewed-on: http://gerrit.cloudera.org:8080/10780
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/30d196fd
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/30d196fd
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/30d196fd

Branch: refs/heads/master
Commit: 30d196fd50f44519a5289d94388bdcbe970923d1
Parents: 837d386
Author: Taras Bobrovytsky <ta...@apache.org>
Authored: Mon Jun 18 14:34:35 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Jul 7 01:10:36 2018 +0000

----------------------------------------------------------------------
 bin/start-impala-cluster.py    | 227 +++++++++++++++++++++++-------------
 tests/common/impala_service.py |   3 +-
 2 files changed, 147 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index 6856594..4d34d45 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -20,18 +20,24 @@
 # Starts up an Impala cluster (ImpalaD + State Store) with the specified number of
 # ImpalaD instances. Each ImpalaD runs on a different port allowing this to be run
 # on a single machine.
+import logging
 import os
 import psutil
 import sys
+from datetime import datetime
 from getpass import getuser
 from time import sleep, time
 from optparse import OptionParser, SUPPRESS_HELP
 from testdata.common import cgroups
 from tests.common.environ import specific_build_type_timeout
 
-KUDU_MASTER_HOSTS = os.getenv('KUDU_MASTER_HOSTS', '127.0.0.1')
-DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
+logging.basicConfig(level=logging.ERROR, format="%(asctime)s %(threadName)s: %(message)s",
+    datefmt="%H:%M:%S")
+LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
+LOG.setLevel(level=logging.DEBUG)
 
+KUDU_MASTER_HOSTS = os.getenv("KUDU_MASTER_HOSTS", "127.0.0.1")
+DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get("IMPALA_MAX_LOG_FILES", 10)
 
 # Options
 parser = OptionParser()
@@ -43,7 +49,7 @@ parser.add_option("--use_exclusive_coordinators", dest="use_exclusive_coordinato
                   action="store_true", default=False, help="If true, coordinators only "
                   "coordinate queries and execute coordinator fragments. If false, "
                   "coordinators also act as executors.")
-parser.add_option("--build_type", dest="build_type", default= 'latest',
+parser.add_option("--build_type", dest="build_type", default= "latest",
                   help="Build type to use - debug / release / latest")
 parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
                   default=[],
@@ -67,10 +73,10 @@ parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
 parser.add_option("--in-process", dest="inprocess", action="store_true", default=False,
                   help="Start all Impala backends and state store in a single process.")
 parser.add_option("--log_dir", dest="log_dir",
-                  default=os.environ['IMPALA_CLUSTER_LOGS_DIR'],
+                  default=os.environ["IMPALA_CLUSTER_LOGS_DIR"],
                   help="Directory to store output logs to.")
-parser.add_option('--max_log_files', default=DEFAULT_IMPALA_MAX_LOG_FILES,
-                  help='Max number of log files before rotation occurs.')
+parser.add_option("--max_log_files", default=DEFAULT_IMPALA_MAX_LOG_FILES,
+                  help="Max number of log files before rotation occurs.")
 parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False,
                   help="Prints all output to stderr/stdout.")
 parser.add_option("--log_level", type="int", dest="log_level", default=1,
@@ -92,21 +98,19 @@ parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
 
 options, args = parser.parse_args()
 
-IMPALA_HOME = os.environ['IMPALA_HOME']
-KNOWN_BUILD_TYPES = ['debug', 'release', 'latest']
+IMPALA_HOME = os.environ["IMPALA_HOME"]
+KNOWN_BUILD_TYPES = ["debug", "release", "latest"]
 IMPALAD_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-impalad.sh -build_type=%s' % options.build_type)
+    "bin/start-impalad.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 STATE_STORE_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-statestored.sh -build_type=%s' % options.build_type)
+    "bin/start-statestored.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 CATALOGD_PATH = os.path.join(IMPALA_HOME,
-    'bin/start-catalogd.sh -build_type=%s' % options.build_type)
+    "bin/start-catalogd.sh -build_type={build_type}".format(
+        build_type=options.build_type))
 MINI_IMPALA_CLUSTER_PATH = IMPALAD_PATH + " -in-process"
 
-IMPALA_SHELL = os.path.join(IMPALA_HOME, 'bin/impala-shell.sh')
-IMPALAD_PORTS = ("-beeswax_port=%d -hs2_port=%d  -be_port=%d -krpc_port=%d "
-                 "-state_store_subscriber_port=%d -webserver_port=%d")
-JVM_ARGS = "-jvm_debug_port=%s -jvm_args=%s"
-BE_LOGGING_ARGS = "-log_filename=%s -log_dir=%s -v=%s -logbufsecs=5 -max_log_files=%s"
 CLUSTER_WAIT_TIMEOUT_IN_SECONDS = 240
 # Kills have a timeout to prevent automated scripts from hanging indefinitely.
 # It is set to a high value to avoid failing if processes are slow to shut down.
@@ -143,14 +147,18 @@ def check_process_exists(binary, attempts=1):
 def exec_impala_process(cmd, args, stderr_log_file_path):
   redirect_output = str()
   if options.verbose:
-    args += ' -logtostderr=1'
+    args += " -logtostderr=1"
   else:
-    redirect_output = "1>%s" % stderr_log_file_path
-  cmd = '%s %s %s 2>&1 &' % (cmd, args, redirect_output)
+    redirect_output = "1>{stderr_log_file_path}".format(
+        stderr_log_file_path=stderr_log_file_path)
+  cmd = "{cmd} {args} {redirect_output} 2>&1 &".format(
+      cmd=cmd,
+      args=args,
+      redirect_output=redirect_output)
   os.system(cmd)
 
 def kill_cluster_processes(force=False):
-  binaries = ['catalogd', 'impalad', 'statestored']
+  binaries = ["catalogd", "impalad", "statestored"]
   kill_matching_processes(binaries, force)
 
 def kill_matching_processes(binary_names, force=False):
@@ -170,50 +178,75 @@ def kill_matching_processes(binary_names, force=False):
     try:
       process.wait(KILL_TIMEOUT_IN_SECONDS)
     except psutil.TimeoutExpired:
-      raise RuntimeError("Unable to kill %s (pid %d) after %d seconds." % (process.name,
-          process.pid, KILL_TIMEOUT_IN_SECONDS))
+      raise RuntimeError(("Unable to kill {process_name} (pid {process_pid}) "
+          "after {num_seconds} seconds.").format(
+              process_name=process.name,
+              process_pid=process.pid,
+              num_seconds=KILL_TIMEOUT_IN_SECONDS))
 
 def start_statestore():
-  print "Starting State Store logging to %s/statestored.INFO" % options.log_dir
+  LOG.info("Starting State Store logging to {log_dir}/statestored.INFO".format(
+      log_dir=options.log_dir))
   stderr_log_file_path = os.path.join(options.log_dir, "statestore-error.log")
-  args = "%s %s" % (build_impalad_logging_args(0, "statestored"),
-                    " ".join(options.state_store_args))
+  args = "{impalad_logging_args} {state_store_args}".format(
+      impalad_logging_args=build_impalad_logging_args(0, "statestored"),
+      state_store_args=" ".join(options.state_store_args))
   exec_impala_process(STATE_STORE_PATH, args, stderr_log_file_path)
   if not check_process_exists("statestored", 10):
     raise RuntimeError("Unable to start statestored. Check log or file permissions"
                        " for more details.")
 
 def start_catalogd():
-  print "Starting Catalog Service logging to %s/catalogd.INFO" % options.log_dir
+  LOG.info("Starting Catalog Service logging to {log_dir}/catalogd.INFO".format(
+      log_dir=options.log_dir))
   stderr_log_file_path = os.path.join(options.log_dir, "catalogd-error.log")
-  args = "%s %s %s" % (build_impalad_logging_args(0, "catalogd"),
-                       " ".join(options.catalogd_args),
-                       build_jvm_args(options.cluster_size))
+  args = "{impalad_logging_args} {catalogd_args} {jvm_args}".format(
+      impalad_logging_args=build_impalad_logging_args(0, "catalogd"),
+      catalogd_args=" ".join(options.catalogd_args),
+      jvm_args=build_jvm_args(options.cluster_size))
   exec_impala_process(CATALOGD_PATH, args, stderr_log_file_path)
   if not check_process_exists("catalogd", 10):
     raise RuntimeError("Unable to start catalogd. Check log or file permissions"
                        " for more details.")
 
 def build_impalad_port_args(instance_num):
+  IMPALAD_PORTS = (
+      "-beeswax_port={beeswax_port} "
+      "-hs2_port={hs2_port} "
+      "-be_port={be_port} "
+      "-krpc_port={krpc_port} "
+      "-state_store_subscriber_port={state_store_subscriber_port} "
+      "-webserver_port={webserver_port}")
   BASE_BEESWAX_PORT = 21000
   BASE_HS2_PORT = 21050
   BASE_BE_PORT = 22000
   BASE_KRPC_PORT = 27000
   BASE_STATE_STORE_SUBSCRIBER_PORT = 23000
   BASE_WEBSERVER_PORT = 25000
-  return IMPALAD_PORTS % (BASE_BEESWAX_PORT + instance_num, BASE_HS2_PORT + instance_num,
-                          BASE_BE_PORT + instance_num,
-                          BASE_KRPC_PORT + instance_num,
-                          BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
-                          BASE_WEBSERVER_PORT + instance_num)
+  return IMPALAD_PORTS.format(
+      beeswax_port=BASE_BEESWAX_PORT + instance_num,
+      hs2_port=BASE_HS2_PORT + instance_num,
+      be_port=BASE_BE_PORT + instance_num,
+      krpc_port=BASE_KRPC_PORT + instance_num,
+      state_store_subscriber_port=BASE_STATE_STORE_SUBSCRIBER_PORT + instance_num,
+      webserver_port=BASE_WEBSERVER_PORT + instance_num)
 
 def build_impalad_logging_args(instance_num, service_name):
-  return BE_LOGGING_ARGS % (service_name, options.log_dir, options.log_level,
-                            options.max_log_files)
+  return ("-log_filename={log_filename} "
+      "-log_dir={log_dir} "
+      "-v={log_level} "
+      "-logbufsecs=5 "
+      "-max_log_files={max_log_files}").format(
+          log_filename=service_name,
+          log_dir=options.log_dir,
+          log_level=options.log_level,
+          max_log_files=options.max_log_files)
 
 def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
-  return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
+  return "-jvm_debug_port={jvm_debug_port} -jvm_args={jvm_args}".format(
+      jvm_debug_port=BASE_JVM_DEBUG_PORT + instance_num,
+      jvm_args=options.jvm_args)
 
 def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordinators):
   """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
@@ -253,45 +286,62 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi
       # The first impalad always logs to impalad.INFO
       service_name = "impalad"
     else:
-      service_name = "impalad_node%s" % i
+      service_name = "impalad_node{node_num}".format(node_num=i)
       # Sleep between instance startup: simultaneous starts hurt the minikdc
       # Yes, this is a hack, but it's easier than modifying the minikdc...
       # TODO: is this really necessary?
       sleep(1)
 
-    print "Starting Impala Daemon logging to %s/%s.INFO" % (options.log_dir,
-        service_name)
+    LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
+        log_dir=options.log_dir,
+        service_name=service_name))
 
     # impalad args from the --impalad_args flag. Also replacing '#ID' with the instance.
     param_args = (" ".join(options.impalad_args)).replace("#ID", str(i))
-    args = "--mem_limit=%s %s %s %s %s" %\
-          (mem_limit,  # Goes first so --impalad_args will override it.
-           build_impalad_logging_args(i, service_name), build_jvm_args(i),
-           build_impalad_port_args(i), param_args)
+    args = ("--mem_limit={mem_limit} "
+        "{impala_logging_args} "
+        "{jvm_args} "
+        "{impala_port_args} "
+        "{param_args}").format(
+            mem_limit=mem_limit,  # Goes first so --impalad_args will override it.
+            impala_logging_args=build_impalad_logging_args(i, service_name),
+            jvm_args=build_jvm_args(i),
+            impala_port_args=build_impalad_port_args(i),
+            param_args=param_args)
     if options.kudu_master_hosts:
       # Must be prepended, otherwise the java options interfere.
-      args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
+      args = "-kudu_master_hosts {kudu_master_hosts} {args}".format(
+          kudu_master_hosts=options.kudu_master_hosts,
+          args=args)
 
     if "kudu_client_rpc_timeout" not in args:
-      args = "-kudu_client_rpc_timeout_ms %s %s" % (KUDU_RPC_TIMEOUT, args)
+      args = "-kudu_client_rpc_timeout_ms {kudu_rpc_timeout} {args}".format(
+          kudu_rpc_timeout=KUDU_RPC_TIMEOUT,
+          args=args)
 
     if i >= num_coordinators:
-      args = "-is_coordinator=false %s" % (args)
+      args = "-is_coordinator=false {args}".format(args=args)
     elif use_exclusive_coordinators:
       # Coordinator instance that doesn't execute non-coordinator fragments
-      args = "-is_executor=false %s" % (args)
+      args = "-is_executor=false {args}".format(args=args)
 
     if i < len(delay_list):
-      args = "-stress_catalog_init_delay_ms=%s %s" % (delay_list[i], args)
+      args = "-stress_catalog_init_delay_ms={delay} {args}".format(
+          delay=delay_list[i],
+          args=args)
 
     if options.disable_krpc:
-      args = "-use_krpc=false %s" % (args)
+      args = "-use_krpc=false {args}".format(args=args)
 
     # Appended at the end so they can override previous args.
     if i < len(per_impalad_args):
-      args = "%s %s" % (args, per_impalad_args[i])
+      args = "{args} {per_impalad_args}".format(
+          args=args,
+          per_impalad_args=per_impalad_args[i])
 
-    stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
+    stderr_log_file_path = os.path.join(
+        options.log_dir,
+        "{service_name}-error.log".format(service_name=service_name))
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
 def wait_for_impala_process_count(impala_cluster, retries=10):
@@ -309,8 +359,9 @@ def wait_for_impala_process_count(impala_cluster, retries=10):
   msg = str()
   if len(impala_cluster.impalads) < options.cluster_size:
     impalads_found = len(impala_cluster.impalads)
-    msg += "Expected %d impalad(s), only %d found\n" %\
-        (options.cluster_size, impalads_found)
+    msg += "Expected {expected_num} impalad(s), only {actual_num} found\n".format(
+        expected_num=options.cluster_size,
+        actual_num=impalads_found)
   if not impala_cluster.statestored:
     msg += "statestored failed to start.\n"
   if not impala_cluster.catalogd:
@@ -341,8 +392,8 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   for impalad in impala_cluster.impalads:
     impalad.service.wait_for_num_known_live_backends(expected_num_backends,
         timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    if impalad._get_arg_value('is_coordinator', default='true') == 'true' and \
-       impalad._get_arg_value('stress_catalog_init_delay_ms', default=0) == 0:
+    if impalad._get_arg_value("is_coordinator", default="true") == "true" and \
+       impalad._get_arg_value("stress_catalog_init_delay_ms", default=0) == 0:
       wait_for_catalog(impalad)
 
 def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
@@ -356,29 +407,35 @@ def wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS
   while (time() - start_time < timeout_in_seconds):
     try:
       num_dbs, num_tbls = impalad.service.get_metric_values(
-          ['catalog.num-databases', 'catalog.num-tables'])
+          ["catalog.num-databases", "catalog.num-tables"])
       client_beeswax = impalad.service.create_beeswax_client()
       client_hs2 = impalad.service.create_hs2_client()
       break
     except Exception as e:
-      print 'Client services not ready.'
-      print 'Waiting for catalog cache: (%s DBs / %s tables). Trying again ...' %\
-        (num_dbs, num_tbls)
+      LOG.exception(("Client services not ready. Waiting for catalog cache: "
+          "({num_dbs} DBs / {num_tbls} tables). Trying again ...").format(
+              num_dbs=num_dbs,
+              num_tbls=num_tbls))
     finally:
       if client_beeswax is not None: client_beeswax.close()
     sleep(0.5)
 
   if client_beeswax is None or client_hs2 is None:
-    raise RuntimeError('Unable to open client ports within %s seconds.'\
-                       % timeout_in_seconds)
+    raise RuntimeError("Unable to open client ports within {num_seconds} seconds.".format(
+        num_seconds=timeout_in_seconds))
 
 def wait_for_cluster_cmdline(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   """Checks if the cluster is "ready" by executing a simple query in a loop"""
   start_time = time()
-  while os.system('%s -i localhost:21000 -q "%s"' %  (IMPALA_SHELL, 'select 1')) != 0:
+  IMPALA_SHELL = os.path.join(IMPALA_HOME, "bin/impala-shell.sh")
+  cmd = "{impala_shell} -i localhost:21000 -q '{query}'".format(
+      impala_shell=IMPALA_SHELL,
+      query="select 1")
+  while os.system(cmd) != 0:
     if time() - timeout_in_seconds > start_time:
-      raise RuntimeError('Cluster did not start within %d seconds' % timeout_in_seconds)
-    print 'Cluster not yet available. Sleeping...'
+      raise RuntimeError("Cluster did not start within {num_seconds} seconds".format(
+        num_seconds=timeout_in_seconds))
+    LOG.info("Cluster not yet available. Sleeping...")
     sleep(2)
 
 if __name__ == "__main__":
@@ -387,32 +444,35 @@ if __name__ == "__main__":
     sys.exit(0)
 
   if options.build_type not in KNOWN_BUILD_TYPES:
-    print 'Invalid build type %s' % options.build_type
-    print 'Valid values: %s' % ', '.join(KNOWN_BUILD_TYPES)
+    LOG.error("Invalid build type {0}".format(options.build_type))
+    LOG.error("Valid values: {0}".format(", ".join(KNOWN_BUILD_TYPES)))
     sys.exit(1)
 
   if options.cluster_size < 0:
-    print 'Please specify a cluster size >= 0'
+    LOG.error("Please specify a cluster size >= 0")
     sys.exit(1)
 
   if options.num_coordinators <= 0:
-    print 'Please specify a valid number of coordinators > 0'
+    LOG.error("Please specify a valid number of coordinators > 0")
     sys.exit(1)
 
-  if options.use_exclusive_coordinators and options.num_coordinators >= options.cluster_size:
-    print 'Cannot start an Impala cluster with no executors'
+  if (options.use_exclusive_coordinators and
+      options.num_coordinators >= options.cluster_size):
+    LOG.error("Cannot start an Impala cluster with no executors")
     sys.exit(1)
 
   if not os.path.isdir(options.log_dir):
-    print 'Log dir does not exist or is not a directory: %s' % options.log_dir
+    LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
+        log_dir=options.log_dir))
     sys.exit(1)
 
   # Kill existing cluster processes based on the current configuration.
   if options.restart_impalad_only:
     if options.inprocess:
-      print 'Cannot perform individual component restarts using an in-process cluster'
+      LOG.error(
+          "Cannot perform individual component restarts using an in-process cluster")
       sys.exit(1)
-    kill_matching_processes(['impalad'], force=options.force_kill)
+    kill_matching_processes(["impalad"], force=options.force_kill)
   else:
     kill_cluster_processes(force=options.force_kill)
 
@@ -420,7 +480,8 @@ if __name__ == "__main__":
     import json
     wait_for_cluster = wait_for_cluster_web
   except ImportError:
-    print "json module not found, checking for cluster startup through the command-line"
+    LOG.exception("json module not found, checking "
+        "for cluster startup through the command-line")
     wait_for_cluster = wait_for_cluster_cmdline
 
   # If ImpalaCluster cannot be imported, fall back to the command-line to check
@@ -430,10 +491,11 @@ if __name__ == "__main__":
     if options.restart_impalad_only:
       impala_cluster = ImpalaCluster()
       if not impala_cluster.statestored or not impala_cluster.catalogd:
-        print 'No running statestored or catalogd detected. Restarting entire cluster.'
+        LOG.info("No running statestored or catalogd detected. "
+            "Restarting entire cluster.")
         options.restart_impalad_only = False
   except ImportError:
-    print 'ImpalaCluster module not found.'
+    LOG.exception("ImpalaCluster module not found.")
     # TODO: Update this code path to work similar to the ImpalaCluster code path when
     # restarting only impalad processes. Specifically, we should do a full cluster
     # restart if either the statestored or catalogd processes are down, even if
@@ -452,14 +514,15 @@ if __name__ == "__main__":
     # Check for the cluster to be ready.
     wait_for_cluster()
   except Exception, e:
-    print 'Error starting cluster: %s' % e
+    LOG.exception("Error starting cluster")
     sys.exit(1)
 
   if options.use_exclusive_coordinators == True:
     executors = options.cluster_size - options.num_coordinators
   else:
     executors = options.cluster_size
-  print 'Impala Cluster Running with %d nodes (%d coordinators, %d executors).' % (
-      options.cluster_size,
-      min(options.cluster_size, options.num_coordinators),
-      executors)
+  LOG.info(("Impala Cluster Running with {num_nodes} nodes "
+      "({num_coordinators} coordinators, {num_executors} executors).").format(
+          num_nodes=options.cluster_size,
+          num_coordinators=min(options.cluster_size, options.num_coordinators),
+          num_executors=executors))

http://git-wip-us.apache.org/repos/asf/impala/blob/30d196fd/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 9772b13..0934f78 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -35,7 +35,8 @@ from RuntimeProfile.ttypes import TRuntimeProfileTree
 import base64
 import zlib
 
-logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
+logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(threadName)s: %(message)s',
+    datefmt='%H:%M:%S')
 LOG = logging.getLogger('impala_service')
 LOG.setLevel(level=logging.DEBUG)