You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/23 04:19:26 UTC

[2/5] incubator-impala git commit: IMPALA-4985: use parquet stats of nested types for dynamic pruning

IMPALA-4985: use parquet stats of nested types for dynamic pruning

Currently, parquet row-groups can be pruned at run-time using
min/max stats when predicates (in, binary) are specified for
column scalar types. This patch extends pruning to nested types
for the same class of predicates. A nested value is an instance
of a nested type (struct, array, map). A nested value consists of
other nested and scalar values (as declared by its type).
Predicates that can be used for row-group pruning must be applied to
nested scalar values. In addition, the parent of the nested scalar
must also be required, that is, not empty. The latter requirement
is conservative: some filters that could be used for pruning are
not used for correctness reasons.

Testing:
- extended nested-types-parquet-stats e2e test cases.

Change-Id: I0c99e20cb080b504442cd5376ea3e046016158fe
Reviewed-on: http://gerrit.cloudera.org:8080/8480
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/21a96ed2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/21a96ed2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/21a96ed2

Branch: refs/heads/master
Commit: 21a96ed2e39537db39cda14ebdacc83a8c4c89f3
Parents: 223707d
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Wed Nov 1 10:35:43 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 22 22:00:16 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.h              |   2 +-
 .../impala/analysis/CollectionStructType.java   |   1 +
 .../org/apache/impala/analysis/SelectStmt.java  |   8 +
 .../org/apache/impala/analysis/SlotRef.java     |  20 ++
 .../org/apache/impala/planner/HdfsScanNode.java |  82 ++++--
 .../queries/PlannerTest/constant-folding.test   |   2 +-
 .../queries/PlannerTest/mt-dop-validation.test  |   6 +-
 .../queries/PlannerTest/parquet-filtering.test  | 275 ++++++++++++++++++
 .../QueryTest/nested-types-parquet-stats.test   | 288 ++++++++++++++++++-
 tests/query_test/test_nested_types.py           |   3 -
 10 files changed, 649 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 0eea458..99b5a60 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -188,7 +188,7 @@ class BoolColumnReader;
 ///   0), but we don't distinguish between these two cases yet.
 ///   TODO: fix this (IMPALA-2272)
 ///
-///   The column readers that materialize this structure form a tree analagous to the
+///   The column readers that materialize this structure form a tree analogous to the
 ///   materialized output:
 ///     CollectionColumnReader slot_id=0 node="repeated group list (d=2 r=1)"
 ///       CollectionColumnReader slot_id=1 node="repeated group list (d=4 r=2)"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/fe/src/main/java/org/apache/impala/analysis/CollectionStructType.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CollectionStructType.java b/fe/src/main/java/org/apache/impala/analysis/CollectionStructType.java
index a982774..d05a6b2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CollectionStructType.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CollectionStructType.java
@@ -75,4 +75,5 @@ public class CollectionStructType extends StructType {
 
   public StructField getOptionalField() { return optionalField_; }
   public boolean isMapStruct() { return isMapStruct_; }
+  public boolean isArrayStruct() { return !isMapStruct_; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 068e09a..2ba5105 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -291,6 +291,14 @@ public class SelectStmt extends QueryStmt {
    * - collection table ref represents the rhs of an inner/cross/semi join
    * - collection table ref's parent tuple is not outer joined
    *
+   * Example: table T has field A which is of type array<array<int>>.
+   * 1) ... T join T.A a join a.item a_nest ... : all nodes on the path T -> a -> a_nest
+   *                                              are required so are checked for !empty.
+   * 2) ... T left outer join T.A a join a.item a_nest ... : no !empty.
+   * 3) ... T join T.A a left outer join a.item a_nest ... : a checked for !empty.
+   * 4) ... T left outer join T.A a left outer join a.item a_nest ... : no !empty.
+   *
+   *
    * TODO: In some cases, it is possible to generate !empty() predicates for a correlated
    * table ref, but in general, that is not correct for non-trivial query blocks.
    * For example, if the block with the correlated ref has an aggregation then adding a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index f4b2144..b4505ba 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -150,6 +150,26 @@ public class SlotRef extends Expr {
     return "<slot " + Integer.toString(desc_.getId().asInt()) + ">";
   }
 
+  /**
+   * Checks if this slotRef refers to an array "pos" pseudo-column.
+   *
+   * Note: checking whether the column is null distinguishes between top-level columns
+   * and nested types. This check more specifically looks just for a reference to the
+   * "pos" field of an array type.
+   */
+  public boolean isArrayPosRef() {
+    TupleDescriptor parent = getDesc().getParent();
+    if (parent == null) return false;
+    Type parentType = parent.getType();
+    if (parentType instanceof CollectionStructType) {
+      if (((CollectionStructType)parentType).isArrayStruct() &&
+          getDesc().getLabel().equals(Path.ARRAY_POS_FIELD_NAME)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   protected void toThrift(TExprNode msg) {
     msg.node_type = TExprNodeType.SLOT_REF;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 44f58eb..1e0dd72 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -37,6 +37,7 @@ import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.FunctionParams;
 import org.apache.impala.analysis.InPredicate;
+import org.apache.impala.analysis.IsNotEmptyPredicate;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.SlotDescriptor;
@@ -173,6 +174,10 @@ public class HdfsScanNode extends ScanNode {
   private final Map<TupleDescriptor, List<Expr>> collectionConjuncts_ =
       Maps.newLinkedHashMap();
 
+  // TupleDescriptors of collection slots that have an IsNotEmptyPredicate. See
+  // SelectStmt#registerIsNotEmptyPredicates.
+  private final Set<TupleDescriptor> notEmptyCollections_ = Sets.newHashSet();
+
   // Map from SlotIds to indices in PlanNodes.conjuncts_ that are eligible for
   // dictionary filtering
   private Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
@@ -398,6 +403,7 @@ public class HdfsScanNode extends ScanNode {
    */
   private void assignCollectionConjuncts(Analyzer analyzer) {
     collectionConjuncts_.clear();
+    addNotEmptyCollections(conjuncts_);
     assignCollectionConjuncts(desc_, analyzer);
   }
 
@@ -426,13 +432,13 @@ public class HdfsScanNode extends ScanNode {
     // We only support slot refs on the left hand side of the predicate, a rewriting
     // rule makes sure that all compatible exprs are rewritten into this form. Only
     // implicit casts are supported.
-    SlotRef slot = binaryPred.getChild(0).unwrapSlotRef(true);
-    if (slot == null) return;
+    SlotRef slotRef = binaryPred.getChild(0).unwrapSlotRef(true);
+    if (slotRef == null) return;
 
     // This node is a table scan, so this must be a scanning slot.
-    Preconditions.checkState(slot.getDesc().isScanSlot());
-    // If the column is null, then this can be a 'pos' scanning slot of a nested type.
-    if (slot.getDesc().getColumn() == null) return;
+    Preconditions.checkState(slotRef.getDesc().isScanSlot());
+    // Skip the slot ref if it refers to an array's "pos" field.
+    if (slotRef.isArrayPosRef()) return;
 
     Expr constExpr = binaryPred.getChild(1);
     // Only constant exprs can be evaluated against parquet::Statistics. This includes
@@ -444,24 +450,23 @@ public class HdfsScanNode extends ScanNode {
     if (op == BinaryPredicate.Operator.LT || op == BinaryPredicate.Operator.LE ||
         op == BinaryPredicate.Operator.GE || op == BinaryPredicate.Operator.GT) {
       minMaxOriginalConjuncts_.add(binaryPred);
-      buildStatsPredicate(analyzer, slot, binaryPred, op);
+      buildStatsPredicate(analyzer, slotRef, binaryPred, op);
     } else if (op == BinaryPredicate.Operator.EQ) {
       minMaxOriginalConjuncts_.add(binaryPred);
       // TODO: this could be optimized for boolean columns.
-      buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.LE);
-      buildStatsPredicate(analyzer, slot, binaryPred, BinaryPredicate.Operator.GE);
+      buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.LE);
+      buildStatsPredicate(analyzer, slotRef, binaryPred, BinaryPredicate.Operator.GE);
     }
   }
 
   private void tryComputeInListMinMaxPredicate(Analyzer analyzer, InPredicate inPred) {
-    // Retrieve the left side of the IN predicate. It must be a simple slot to
-    // proceed.
-    SlotRef slot = inPred.getBoundSlot();
-    if (slot == null) return;
+    // Retrieve the left side of the IN predicate. It must be a simple slot to proceed.
+    SlotRef slotRef = inPred.getBoundSlot();
+    if (slotRef == null) return;
     // This node is a table scan, so this must be a scanning slot.
-    Preconditions.checkState(slot.getDesc().isScanSlot());
-    // If the column is null, then this can be a 'pos' scanning slot of a nested type.
-    if (slot.getDesc().getColumn() == null) return;
+    Preconditions.checkState(slotRef.getDesc().isScanSlot());
+    // Skip the slot ref if it refers to an array's "pos" field.
+    if (slotRef.isArrayPosRef()) return;
     if (inPred.isNotIn()) return;
 
     ArrayList<Expr> children = inPred.getChildren();
@@ -488,8 +493,30 @@ public class HdfsScanNode extends ScanNode {
         children.get(0).clone(), max.clone());
 
     minMaxOriginalConjuncts_.add(inPred);
-    buildStatsPredicate(analyzer, slot, minBound, minBound.getOp());
-    buildStatsPredicate(analyzer, slot, maxBound, maxBound.getOp());
+    buildStatsPredicate(analyzer, slotRef, minBound, minBound.getOp());
+    buildStatsPredicate(analyzer, slotRef, maxBound, maxBound.getOp());
+  }
+
+  private void tryComputeMinMaxPredicate(Analyzer analyzer, Expr pred) {
+    if (pred instanceof BinaryPredicate) {
+      tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred);
+    } else if (pred instanceof InPredicate) {
+      tryComputeInListMinMaxPredicate(analyzer, (InPredicate) pred);
+    }
+  }
+
+  /**
+   * Populates notEmptyCollections_ based on IsNotEmptyPredicates in the given conjuncts.
+   */
+  private void addNotEmptyCollections(List<Expr> conjuncts) {
+    for (Expr expr : conjuncts) {
+      if (expr instanceof IsNotEmptyPredicate) {
+        SlotRef ref = (SlotRef)((IsNotEmptyPredicate)expr).getChild(0);
+        Preconditions.checkState(ref.getDesc().getType().isComplexType());
+        Preconditions.checkState(ref.getDesc().getItemTupleDesc() != null);
+        notEmptyCollections_.add(ref.getDesc().getItemTupleDesc());
+      }
+    }
   }
 
   /**
@@ -505,11 +532,17 @@ public class HdfsScanNode extends ScanNode {
     minMaxTuple_ = descTbl.createTupleDescriptor(tupleName);
     minMaxTuple_.setPath(desc_.getPath());
 
-    for (Expr pred: conjuncts_) {
-      if (pred instanceof BinaryPredicate) {
-        tryComputeBinaryMinMaxPredicate(analyzer, (BinaryPredicate) pred);
-      } else if (pred instanceof InPredicate) {
-        tryComputeInListMinMaxPredicate(analyzer, (InPredicate) pred);
+    // Adds predicates for scalar, top-level columns.
+    for (Expr pred: conjuncts_) tryComputeMinMaxPredicate(analyzer, pred);
+
+    // Adds predicates for collections.
+    for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
+      // Adds only predicates for collections that are filtered by an IsNotEmptyPredicate.
+      // It is assumed that analysis adds these filters such that they are correct, but
+      // potentially conservative. See the tests for examples that could benefit from
+      // being more aggressive (yet still correct).
+      if (notEmptyCollections_.contains(entry.getKey())) {
+        for (Expr pred: entry.getValue()) tryComputeMinMaxPredicate(analyzer, pred);
       }
     }
     minMaxTuple_.computeMemLayout();
@@ -517,7 +550,7 @@ public class HdfsScanNode extends ScanNode {
 
   /**
    * Recursively collects and assigns conjuncts bound by tuples materialized in a
-   * collection-typed slot.
+   * collection-typed slot. As conjuncts are seen, collect non-empty nested collections.
    *
    * Limitation: Conjuncts that must first be migrated into inline views and that cannot
    * be captured by slot binding will not be assigned here, but in an UnnestNode.
@@ -525,7 +558,7 @@ public class HdfsScanNode extends ScanNode {
    * non-SlotRef exprs in the inline-view's select list. We only capture value transfers
    * between slots, and not between arbitrary exprs.
    *
-   * TODO for 2.3: The logic for gathering conjuncts and deciding which ones should be
+   * TODO: The logic for gathering conjuncts and deciding which ones should be
    * marked as assigned needs to be clarified and consolidated in one place. The code
    * below is rather different from the code for assigning the top-level conjuncts in
    * init() although the performed tasks is conceptually identical. Refactoring the
@@ -560,6 +593,7 @@ public class HdfsScanNode extends ScanNode {
       if (!collectionConjuncts.isEmpty()) {
         analyzer.materializeSlots(collectionConjuncts);
         collectionConjuncts_.put(itemTupleDesc, collectionConjuncts);
+        addNotEmptyCollections(collectionConjuncts);
       }
       // Recursively look for collection-typed slots in nested tuple descriptors.
       assignCollectionConjuncts(itemTupleDesc, analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 1e156ba..1a8b6a7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -53,7 +53,7 @@ PLAN-ROOT SINK
    stats-rows=150000 extrapolated-rows=disabled
    table stats: rows=150000 size=292.35MB
    columns missing stats: c_orders
-   parquet statistics predicates: c_custkey > 10
+   parquet statistics predicates: c_custkey > 10, o_orderkey = 4
    parquet dictionary predicates: c_custkey > 10
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index dcebf07..506921a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -242,7 +242,7 @@ PLAN-ROOT SINK
    stats-rows=150000 extrapolated-rows=disabled
    table stats: rows=150000 size=292.35MB
    columns missing stats: c_orders
-   parquet statistics predicates: c_custkey < 10
+   parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
@@ -302,7 +302,7 @@ Per-Host Resources: mem-estimate=264.00MB mem-reservation=0B
    stats-rows=150000 extrapolated-rows=disabled
    table stats: rows=150000 size=292.35MB
    columns missing stats: c_orders
-   parquet statistics predicates: c_custkey < 10
+   parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
@@ -353,6 +353,7 @@ PLAN-ROOT SINK
    stats-rows=150000 extrapolated-rows=disabled
    table stats: rows=150000 size=292.35MB
    columns missing stats: c_orders, c_orders
+   parquet statistics predicates: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ---- PARALLELPLANS
@@ -403,6 +404,7 @@ Per-Host Resources: mem-estimate=269.81MB mem-reservation=5.81MB
    stats-rows=150000 extrapolated-rows=disabled
    table stats: rows=150000 size=292.35MB
    columns missing stats: c_orders, c_orders
+   parquet statistics predicates: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 4165e70..bdb9102 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -88,3 +88,278 @@ PLAN-ROOT SINK
    mem-estimate=48.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=unavailable
 ====
+# Test collection types where all collections on the path are required (inner
+# join descent). Expect the scan node to include !empty checks for both collections and
+# the min-max filtering for the leaf predicate.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d cn, cn.item a
+where a.item.e < -10;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2,1,0 row-size=44B cardinality=unavailable
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=24B mem-reservation=0B
+|  |  tuple-ids=2,1,0 row-size=44B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2,1 row-size=20B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=16B mem-reservation=0B
+|  |  |  tuple-ids=2,1 row-size=20B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1 row-size=16B cardinality=1
+|  |  |
+|  |  06:UNNEST [cn.item a]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.nested_struct.c.d cn]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.complextypestbl c]
+   partitions=1/1 files=2 size=6.92KB
+   predicates: !empty(c.nested_struct.c.d)
+   predicates on cn: !empty(cn.item)
+   predicates on a: a.item.e < -10
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=6.92KB
+   columns missing stats: id
+   parquet statistics predicates: a.item.e < -10
+   mem-estimate=32.00MB mem-reservation=0B
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====
+# Test collection types where the lower collection in the path is optional
+# (outer join descent) and the upper is required (inner join descent).
+# Expect the scan node to include !empty test for the root, but no min-max
+# filter for the leaf (since it does not have a !empty check).
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d cn
+left outer join cn.item a
+where a.item.e < -10;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2N,1,0 row-size=44B cardinality=unavailable
+|
+|--08:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2N,1,0 row-size=44B cardinality=10
+|  |
+|  |--06:NESTED LOOP JOIN [RIGHT OUTER JOIN]
+|  |  |  predicates: a.item.e < -10
+|  |  |  mem-estimate=40B mem-reservation=0B
+|  |  |  tuple-ids=2N,1,0 row-size=44B cardinality=1
+|  |  |
+|  |  |--04:SINGULAR ROW SRC
+|  |  |     parent-subplan=08
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1,0 row-size=40B cardinality=1
+|  |  |
+|  |  05:UNNEST [cn.item a]
+|  |     parent-subplan=08
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=24B mem-reservation=0B
+|  |  tuple-ids=1,0 row-size=40B cardinality=10
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  03:UNNEST [c.nested_struct.c.d cn]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.complextypestbl c]
+   partitions=1/1 files=2 size=6.92KB
+   predicates: !empty(c.nested_struct.c.d)
+   predicates on a: a.item.e < -10
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=6.92KB
+   columns missing stats: id
+   mem-estimate=32.00MB mem-reservation=0B
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====
+# Tests collection types where the outer is optional (outer join descent)
+# and the inner is required (inner join descent). In this case, !empty is
+# not pushed for either collection, so there is no min-max pruning either.
+select id from functional_parquet.complextypestbl c
+left outer join c.nested_struct.c.d cn, cn.item a where a.item.e < -10;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2,1N,0 row-size=44B cardinality=unavailable
+|
+|--08:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2,1N,0 row-size=44B cardinality=10
+|  |
+|  |--06:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=40B mem-reservation=0B
+|  |  |  tuple-ids=2,1N,0 row-size=44B cardinality=10
+|  |  |
+|  |  |--04:SINGULAR ROW SRC
+|  |  |     parent-subplan=08
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1N,0 row-size=40B cardinality=1
+|  |  |
+|  |  05:UNNEST [cn.item a]
+|  |     parent-subplan=08
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  07:NESTED LOOP JOIN [RIGHT OUTER JOIN]
+|  |  mem-estimate=24B mem-reservation=0B
+|  |  tuple-ids=1N,0 row-size=40B cardinality=1
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  03:UNNEST [c.nested_struct.c.d cn]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.complextypestbl c]
+   partitions=1/1 files=2 size=6.92KB
+   predicates on a: a.item.e < -10
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=6.92KB
+   columns missing stats: id
+   mem-estimate=32.00MB mem-reservation=0B
+   tuple-ids=0 row-size=24B cardinality=unavailable
+====
+# Test collections so that each level has a filter applied.
+select c_custkey from tpch_nested_parquet.customer c, c.c_orders o,
+o.o_lineitems l where c_custkey > 0 and o.o_orderkey > 0 and l.l_partkey > 0;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2,1,0 row-size=56B cardinality=1500000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=24B mem-reservation=0B
+|  |  tuple-ids=2,1,0 row-size=56B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=24B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2,1 row-size=32B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=24B mem-reservation=0B
+|  |  |  tuple-ids=2,1 row-size=32B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1 row-size=24B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems l]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c_custkey > 0, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o.o_orderkey > 0
+   predicates on l: l.l_partkey > 0
+   stats-rows=150000 extrapolated-rows=disabled
+   table stats: rows=150000 size=292.36MB
+   columns missing stats: c_orders
+   parquet statistics predicates: c_custkey > 0, o.o_orderkey > 0, l.l_partkey > 0
+   parquet dictionary predicates: c_custkey > 0
+   mem-estimate=176.00MB mem-reservation=0B
+   tuple-ids=0 row-size=24B cardinality=15000
+====
+# Test collections in a way that would incorrectly apply a min-max
+# filter at the scan. Expect no min-max filter and no !empty tests.
+select count(*) from functional_parquet.complextypestbl c left outer join
+(select * from c.int_array where item > 10) v;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=26.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+05:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
+|  tuple-ids=3 row-size=8B cardinality=1
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=1N,0 row-size=20B cardinality=unavailable
+|
+|--04:NESTED LOOP JOIN [RIGHT OUTER JOIN]
+|  |  mem-estimate=16B mem-reservation=0B
+|  |  tuple-ids=1N,0 row-size=20B cardinality=1
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=16B cardinality=1
+|  |
+|  03:UNNEST [c.int_array]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=4B cardinality=10
+|
+00:SCAN HDFS [functional_parquet.complextypestbl c]
+   partitions=1/1 files=2 size=6.92KB
+   predicates on int_array: item > 10
+   stats-rows=unavailable extrapolated-rows=disabled
+   table stats: rows=unavailable size=6.92KB
+   column stats: unavailable
+   mem-estimate=16.00MB mem-reservation=0B
+   tuple-ids=0 row-size=16B cardinality=unavailable
+====
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/testdata/workloads/functional-query/queries/QueryTest/nested-types-parquet-stats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-parquet-stats.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-parquet-stats.test
index c8ba303..32a9a28 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-parquet-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-parquet-stats.test
@@ -5,8 +5,8 @@ select count(*) from functional_parquet.complextypestbl where id < 1
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*NumRowGroups: 2 .*
-row_regex: .*NumStatsFilteredRowGroups: 2 .*
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
 ====
 ---- QUERY
 # Filter root-level scalar column in file with nested types.
@@ -15,16 +15,290 @@ from   functional_parquet.complextypestbl, complextypestbl.int_array
 where  id < 0;
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumRowGroups: 2 .*
-row_regex: .*NumStatsFilteredRowGroups: 2 .*
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
 ====
 ---- QUERY
-# Nested columns do not support stats based filtering.
+# Row-group skipping based on min-max stats for collection types.
+# Cases tested:
+# - collection required vs. not required (use outer joins)
+# - collection type: array, map, struct
+#
+# Array collection.
 select id, int_array.item
 from   functional_parquet.complextypestbl, functional_parquet.complextypestbl.int_array
 where  int_array.item < -1;
 ---- RESULTS
 ---- RUNTIME_PROFILE
-row_regex: .*NumRowGroups: 2 .*
-row_regex: .*NumStatsFilteredRowGroups: 0 .*
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Map collection
+select id, int_map.key
+from   functional_parquet.complextypestbl, functional_parquet.complextypestbl.int_map
+where  int_map.value < -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Multiple nesting, all filtered
+select id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item < -2;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Multiple nesting, some filtered. Expect some groups filtered, not all. < filter.
+select id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item < -1;
+---- RESULTS
+8
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Multiple nesting, some filtered. Expect some groups filtered, not all. = filter.
+select id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item = -2;
+---- RESULTS
+8
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Multiple nesting, some filtered. Expect some groups filtered, not all. IN filter.
+select id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item in (5,6);
+---- RESULTS
+7
+7
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Multiple nesting, some filtered. Expect some groups filtered, not all. complex filter.
+select id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item > -2 and bottom.item in (-2, -1);
+---- RESULTS
+8
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Multiple nesting, some filtered. Expect some groups filtered, not all. complex filter.
+select distinct id from functional_parquet.complextypestbl c, c.int_array_array cn, cn.item bottom
+where bottom.item > 2 and bottom.item not in (4,5,6);
+---- RESULTS
+1
+2
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 1
+====
+---- QUERY
+# Multiple nesting, mixed collection type
+select id from functional_parquet.complextypestbl c, c.int_map_array cn, cn.item m
+where m.value < -2;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Scalar in struct
+select id from functional_parquet.complextypestbl c where c.nested_struct.a < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Collection in struct
+select id from functional_parquet.complextypestbl c, c.nested_struct.b a
+where a.item < -1;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Collection in struct
+select id from functional_parquet.complextypestbl c, c.nested_struct.b a
+where -1 > a.item;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Deeply nested collection, all required.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d cn, cn.item a
+where a.item.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Deeply nested collection, top required, bottom not required.
+# TODO: with more aggressive pruning, this should skip groups as well.
+#       see the false pruning example below for a case that needs the
+#       more restrictive !empty guard. See org.apache.impala.planner.HdfsScanNode
+#       for more details on when/why !empty guards are inserted during analysis.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d cn
+left outer join cn.item a
+where a.item.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Deeply nested collection, top not required, bottom required (no !empty guard)
+select id from functional_parquet.complextypestbl c
+left outer join c.nested_struct.c.d cn, cn.item a where a.item.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Multiple collections, all required.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d a, a.item aa,
+c.nested_struct.g.value.h.i b where aa.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Multiple collections, all required just on filtering path.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d a,
+a.item aa left outer join c.nested_struct.g.value.h.i b where aa.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 2
+====
+---- QUERY
+# Multiple collections, not required on filtering path.
+select id from functional_parquet.complextypestbl c, c.nested_struct.c.d a
+left outer join a.item aa, c.nested_struct.g.value.h.i b where aa.e < -10;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# False pruning example. There is one table that's scanned (complextypestbl).
+# As a result, there is one scan. The predicate on the complex type is pushed
+# down to the scan, but the field is not required. If we erronenously prune,
+# nothing will be returned instead of the expected left-outer-join result.
+select count(*) from functional_parquet.complextypestbl c left outer join
+(select * from c.int_array where item > 10) v;
+---- RESULTS
+8
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Similar, but filter is not selective, so the outer nested type is repeated for
+# several tuples. Again, no pruning is expected.
+select count(*) from functional_parquet.complextypestbl c left outer join
+(select * from c.int_array where item > -10) v;
+---- RESULTS
+12
+---- TYPES
+BIGINT
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+#
+# Min-max collection filtering for tpch data.
+#
+# Array, single level, struct type:
+select c.c_custkey from tpch_nested_parquet.customer c, c.c_orders o
+where o.o_orderkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Array, multi-level, all required.
+select c.c_custkey from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems l
+where l.l_partkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
+====
+---- QUERY
+# Array, single level, optional
+select c.c_custkey from tpch_nested_parquet.customer c left outer join c.c_orders o
+where o.o_orderkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Array, multi-level, all optional
+select c.c_custkey from tpch_nested_parquet.customer c left outer join c.c_orders o
+left outer join o.o_lineitems l where l.l_partkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Array, multi-level, optional top
+select c.c_custkey from tpch_nested_parquet.customer c left outer join c.c_orders o,
+o.o_lineitems l where l.l_partkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Array, multi-level, optional bottom, filter bottom
+select c.c_custkey from tpch_nested_parquet.customer c, c.c_orders o left outer join
+o.o_lineitems l where l.l_partkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 0
+====
+---- QUERY
+# Array, multi-level, optional bottom, filter top
+select c.c_custkey from tpch_nested_parquet.customer c, c.c_orders o left outer join
+o.o_lineitems l where o.o_orderkey < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumStatsFilteredRowGroups): 4
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/21a96ed2/tests/query_test/test_nested_types.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 20def2c..68021d5 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -80,9 +80,6 @@ class TestNestedTypes(ImpalaTestSuite):
 
   def test_parquet_stats(self, vector):
     """Queries that test evaluation of Parquet row group statistics."""
-    # The test makes assumptions about the number of row groups that are processed and
-    # skipped inside a fragment, so we ensure that the tests run in a single fragment.
-    vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/nested-types-parquet-stats', vector)
 
   @SkipIfIsilon.hive