You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2017/05/02 01:12:48 UTC

[1/2] incubator-impala git commit: Bump Kudu version to 238249c

Repository: incubator-impala
Updated Branches:
  refs/heads/master ac2217b69 -> 77304530f


Bump Kudu version to 238249c

This will pull in the Kudu client partitioner API, which is needed for
IMPALA-3742.

Change-Id: I92587a8061ce70ecd9dac4889bda550636982767
Reviewed-on: http://gerrit.cloudera.org:8080/6718
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a2a0825b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a2a0825b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a2a0825b

Branch: refs/heads/master
Commit: a2a0825b2fb54b1b23e0ed43512230f447fd9038
Parents: ac2217b
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Fri Apr 21 08:14:11 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon May 1 20:04:55 2017 +0000

----------------------------------------------------------------------
 bin/impala-config.sh | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a2a0825b/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 91d9106..53002fe 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -72,8 +72,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=374-72f1e9bb85
-
+export IMPALA_TOOLCHAIN_BUILD_ID=380-e31515725e
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -121,7 +120,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=16dd6e4
+export IMPALA_KUDU_VERSION=238249c
 
 # Kudu version used to identify Java client jar from maven
 export KUDU_JAVA_VERSION=1.4.0-cdh5.12.0-SNAPSHOT


[2/2] incubator-impala git commit: IMPALA-5003: Constant propagation in scan conjuncts

Posted by ab...@apache.org.
IMPALA-5003: Constant propagation in scan conjuncts

Implements constant propagation within conjuncts and applies the
optimization to scan conjuncts and collection conjuncts within Hdfs
scan nodes.  The optimization is applied during planning.  At scan
nodes in particular, we want to optimize to enable partition pruning.
In certain cases, we might end up with a FALSE conditional, which
now will convert to an EmptySet node.

Testing: Expanded the test cases for the planner to achieve constant
propagation.  Added Kudu, datasource, Hdfs and HBase tests to validate
we can create EmptySetNodes.

Change-Id: I79750a8edb945effee2a519fa3b8192b77042cb4
Reviewed-on: http://gerrit.cloudera.org:8080/6389
Tested-by: Impala Public Jenkins
Reviewed-by: Alex Behm <al...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/77304530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/77304530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/77304530

Branch: refs/heads/master
Commit: 77304530f1e4146e045bae9c7037c7ec8e29e9af
Parents: a2a0825
Author: Zach Amsden <za...@cloudera.com>
Authored: Tue Mar 14 17:01:45 2017 +0000
Committer: Alex Behm <al...@cloudera.com>
Committed: Tue May 2 01:12:14 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AnalysisContext.java |  36 +-
 .../org/apache/impala/analysis/Analyzer.java    |  29 +-
 .../java/org/apache/impala/analysis/Expr.java   | 109 ++++++
 .../org/apache/impala/analysis/SelectList.java  |   2 +-
 .../impala/planner/DataSourceScanNode.java      |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   5 +
 .../org/apache/impala/planner/KuduScanNode.java |  13 +-
 .../impala/planner/SingleNodePlanner.java       |  49 +--
 .../org/apache/impala/planner/PlannerTest.java  |   5 +
 .../queries/PlannerTest/analytic-fns.test       |   2 +-
 .../queries/PlannerTest/conjunct-ordering.test  |   9 +-
 .../PlannerTest/constant-propagation.test       | 361 +++++++++++++++++++
 .../queries/PlannerTest/data-source-tables.test |  13 +
 .../queries/PlannerTest/hdfs.test               |   9 +-
 .../queries/PlannerTest/joins.test              |  37 +-
 .../queries/PlannerTest/kudu.test               |  24 +-
 .../PlannerTest/predicate-propagation.test      |   8 +-
 .../queries/PlannerTest/subquery-rewrite.test   |   2 +-
 .../queries/QueryTest/data-source-tables.test   |  11 +-
 19 files changed, 636 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index 6b2281f..c6ee814 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -35,14 +35,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
-import org.apache.impala.rewrite.BetweenToCompoundRule;
-import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
-import org.apache.impala.rewrite.ExtractCommonConjunctRule;
-import org.apache.impala.rewrite.FoldConstantsRule;
-import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
-import org.apache.impala.rewrite.NormalizeExprsRule;
-import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TQueryCtx;
@@ -62,7 +55,7 @@ public class AnalysisContext {
   private final ImpaladCatalog catalog_;
   private final TQueryCtx queryCtx_;
   private final AuthorizationConfig authzConfig_;
-  private final ExprRewriter rewriter_;
+  private final ExprRewriter customRewriter_;
 
   // Timeline of important events in the planning process, used for debugging
   // and profiling
@@ -76,22 +69,7 @@ public class AnalysisContext {
     catalog_ = catalog;
     queryCtx_ = queryCtx;
     authzConfig_ = authzConfig;
-    List<ExprRewriteRule> rules = Lists.newArrayList();
-    // BetweenPredicates must be rewritten to be executable. Other non-essential
-    // expr rewrites can be disabled via a query option. When rewrites are enabled
-    // BetweenPredicates should be rewritten first to help trigger other rules.
-    rules.add(BetweenToCompoundRule.INSTANCE);
-    // Binary predicates must be rewritten to a canonical form for both Kudu predicate
-    // pushdown and Parquet row group pruning based on min/max statistics.
-    rules.add(NormalizeBinaryPredicatesRule.INSTANCE);
-    if (queryCtx.getClient_request().getQuery_options().enable_expr_rewrites) {
-      rules.add(FoldConstantsRule.INSTANCE);
-      rules.add(NormalizeExprsRule.INSTANCE);
-      rules.add(ExtractCommonConjunctRule.INSTANCE);
-      // Relies on FoldConstantsRule and NormalizeExprsRule.
-      rules.add(SimplifyConditionalsRule.INSTANCE);
-    }
-    rewriter_ = new ExprRewriter(rules);
+    customRewriter_ = null;
   }
 
   /**
@@ -102,7 +80,7 @@ public class AnalysisContext {
     catalog_ = catalog;
     queryCtx_ = queryCtx;
     authzConfig_ = authzConfig;
-    rewriter_ = rewriter;
+    customRewriter_ = rewriter;
   }
 
   static public class AnalysisResult {
@@ -401,10 +379,12 @@ public class AnalysisContext {
 
       // Apply expr and subquery rewrites.
       boolean reAnalyze = false;
+      ExprRewriter rewriter = (customRewriter_ != null) ? customRewriter_ :
+          analyzer.getExprRewriter();
       if (analysisResult_.requiresExprRewrite()) {
-        rewriter_.reset();
-        analysisResult_.stmt_.rewriteExprs(rewriter_);
-        reAnalyze = rewriter_.changed();
+        rewriter.reset();
+        analysisResult_.stmt_.rewriteExprs(rewriter);
+        reAnalyze = rewriter.changed();
       }
       if (analysisResult_.requiresSubqueryRewrite()) {
         StmtRewriter.rewrite(analysisResult_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index ef008b7..9360941 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -60,7 +60,12 @@ import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
 import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.rewrite.ExprRewriteRule;
+import org.apache.impala.rewrite.ExtractCommonConjunctRule;
 import org.apache.impala.rewrite.FoldConstantsRule;
+import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule;
+import org.apache.impala.rewrite.NormalizeExprsRule;
+import org.apache.impala.rewrite.SimplifyConditionalsRule;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TAccessEvent;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -298,16 +303,35 @@ public class Analyzer {
     // TODO: Investigate what to do with other catalog objects.
     private final HashMap<TableName, Table> referencedTables_ = Maps.newHashMap();
 
-    // Expr rewriter for foldinc constants.
+    // Expr rewriter for folding constants.
     private final ExprRewriter constantFolder_ =
         new ExprRewriter(FoldConstantsRule.INSTANCE);
 
+    // Expr rewriter for normalizing and rewriting expressions.
+    private final ExprRewriter exprRewriter_;
+
     public GlobalState(ImpaladCatalog catalog, TQueryCtx queryCtx,
         AuthorizationConfig authzConfig) {
       this.catalog = catalog;
       this.queryCtx = queryCtx;
       this.authzConfig = authzConfig;
       this.lineageGraph = new ColumnLineageGraph();
+      List<ExprRewriteRule> rules = Lists.newArrayList();
+      // BetweenPredicates must be rewritten to be executable. Other non-essential
+      // expr rewrites can be disabled via a query option. When rewrites are enabled
+      // BetweenPredicates should be rewritten first to help trigger other rules.
+      rules.add(BetweenToCompoundRule.INSTANCE);
+      // Binary predicates must be rewritten to a canonical form for both Kudu predicate
+      // pushdown and Parquet row group pruning based on min/max statistics.
+      rules.add(NormalizeBinaryPredicatesRule.INSTANCE);
+      if (queryCtx.getClient_request().getQuery_options().enable_expr_rewrites) {
+        rules.add(FoldConstantsRule.INSTANCE);
+        rules.add(NormalizeExprsRule.INSTANCE);
+        rules.add(ExtractCommonConjunctRule.INSTANCE);
+        // Relies on FoldConstantsRule and NormalizeExprsRule.
+        rules.add(SimplifyConditionalsRule.INSTANCE);
+      }
+      exprRewriter_ = new ExprRewriter(rules);
     }
   };
 
@@ -657,6 +681,7 @@ public class Analyzer {
 
   public TableRef getTableRef(TupleId tid) { return tableRefMap_.get(tid); }
   public ExprRewriter getConstantFolder() { return globalState_.constantFolder_; }
+  public ExprRewriter getExprRewriter() { return globalState_.exprRewriter_; }
 
   /**
    * Given a "table alias"."column alias", return the SlotDescriptor
@@ -1051,7 +1076,7 @@ public class Analyzer {
           conjunct = rewriter.rewrite(conjunct, this);
           // analyze this conjunct here: we know it can't contain references to select list
           // aliases and having it analyzed is needed for the following EvalPredicate() call
-          conjunct.analyze(this);;
+          conjunct.analyze(this);
         }
         if (!FeSupport.EvalPredicate(conjunct, globalState_.queryCtx)) {
           if (fromHavingClause) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index e28ab48..6d6442a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -19,11 +19,13 @@ package org.apache.impala.analysis;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Set;
 
+import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
@@ -32,6 +34,7 @@ import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
 import org.slf4j.Logger;
@@ -49,6 +52,8 @@ import com.google.common.collect.Sets;
  *
  */
 abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneable {
+  private final static Logger LOG = LoggerFactory.getLogger(Expr.class);
+
   // Limits on the number of expr children and the depth of an expr tree. These maximum
   // values guard against crashes due to stack overflows (IMPALA-432) and were
   // experimentally determined to be safe.
@@ -151,6 +156,14 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         }
       };
 
+  public final static com.google.common.base.Predicate<Expr> IS_FALSE_LITERAL =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof BoolLiteral && !((BoolLiteral)arg).getValue();
+        }
+      };
+
   public final static com.google.common.base.Predicate<Expr> IS_EQ_BINARY_PREDICATE =
       new com.google.common.base.Predicate<Expr>() {
         @Override
@@ -915,6 +928,102 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     }
   }
 
+  // Arbitrary max exprs considered for constant propagation due to O(n^2) complexity.
+  private final static int CONST_PROPAGATION_EXPR_LIMIT = 200;
+
+  /**
+   * Propagates constant expressions of the form <slot ref> = <constant> to
+   * other uses of slot ref in the given conjuncts; returns a BitSet with
+   * bits set to true in all changed indices.  Only one round of substitution
+   * is performed.  The candidates BitSet is used to determine which members of
+   * conjuncts are considered for propagation.
+   */
+  private static BitSet propagateConstants(List<Expr> conjuncts, BitSet candidates,
+      Analyzer analyzer) {
+    Preconditions.checkState(conjuncts.size() <= candidates.size());
+    BitSet changed = new BitSet(conjuncts.size());
+    for (int i = candidates.nextSetBit(0); i >= 0; i = candidates.nextSetBit(i+1)) {
+      if (!(conjuncts.get(i) instanceof BinaryPredicate)) continue;
+      BinaryPredicate bp = (BinaryPredicate) conjuncts.get(i);
+      if (bp.getOp() != BinaryPredicate.Operator.EQ) continue;
+      SlotRef slotRef = bp.getBoundSlot();
+      if (slotRef == null || !bp.getChild(1).isConstant()) continue;
+      Expr subst = bp.getSlotBinding(slotRef.getSlotId());
+      ExprSubstitutionMap smap = new ExprSubstitutionMap();
+      smap.put(slotRef, subst);
+      for (int j = 0; j < conjuncts.size(); ++j) {
+        // Don't rewrite with our own substitution!
+        if (j == i) continue;
+        Expr toRewrite = conjuncts.get(j);
+        Expr rewritten = toRewrite.substitute(smap, analyzer, true);
+        if (!rewritten.equals(toRewrite)) {
+          conjuncts.set(j, rewritten);
+          changed.set(j, true);
+        }
+      }
+    }
+    return changed;
+  }
+
+  /*
+   * Propagates constants, performs expr rewriting and removes duplicates.
+   * Returns false if a contradiction has been implied, true otherwise.
+   * Catches and logs, but ignores any exceptions thrown during rewrite, which
+   * will leave conjuncts intact and rewritten as far as possible until the
+   * exception.
+   */
+  public static boolean optimizeConjuncts(List<Expr> conjuncts, Analyzer analyzer) {
+    Preconditions.checkNotNull(conjuncts);
+    try {
+      BitSet candidates = new BitSet(conjuncts.size());
+      candidates.set(0, Math.min(conjuncts.size(), CONST_PROPAGATION_EXPR_LIMIT));
+      int transfers = 0;
+
+      // Constant propagation may make other slots constant, so repeat the process
+      // until there are no more changes.
+      while (!candidates.isEmpty()) {
+        BitSet changed = propagateConstants(conjuncts, candidates, analyzer);
+        candidates.clear();
+        int pruned = 0;
+        for (int i = changed.nextSetBit(0); i >= 0; i = changed.nextSetBit(i+1)) {
+          // When propagating constants, we may de-normalize expressions, so we
+          // must normalize binary predicates.  Any additional rules will be
+          // applied by the rewriter.
+          int index = i - pruned;
+          Preconditions.checkState(index >= 0);
+          ExprRewriter rewriter = analyzer.getExprRewriter();
+          Expr rewritten = rewriter.rewrite(conjuncts.get(index), analyzer);
+          // Re-analyze to add implicit casts and update cost
+          rewritten.reset();
+          rewritten.analyze(analyzer);
+          if (!rewritten.isConstant()) {
+            conjuncts.set(index, rewritten);
+            if (++transfers < CONST_PROPAGATION_EXPR_LIMIT) candidates.set(index, true);
+            continue;
+          }
+          // Remove constant boolean literal expressions.  N.B. - we may have
+          // expressions determined to be constant which can not yet be discarded
+          // because they can't be evaluated if expr rewriting is turned off.
+          if (rewritten instanceof NullLiteral ||
+              Expr.IS_FALSE_LITERAL.apply(rewritten)) {
+            conjuncts.clear();
+            conjuncts.add(rewritten);
+            return false;
+          }
+          if (Expr.IS_TRUE_LITERAL.apply(rewritten)) {
+            pruned++;
+            conjuncts.remove(index);
+          }
+        }
+      }
+    } catch (AnalysisException e) {
+      LOG.warn("Not able to analyze after rewrite: " + e.toString() + " conjuncts: " +
+          Expr.debugString(conjuncts));
+    }
+    Expr.removeDuplicates(conjuncts);
+    return true;
+  }
+
   /**
    * Returns true if expr is fully bound by tid, otherwise false.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/analysis/SelectList.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectList.java b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
index 1e7780e..9794f59 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectList.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectList.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Lists;
  * Select list items plus optional distinct clause and optional plan hints.
  */
 public class SelectList {
-  private List<PlanHint> planHints_ = Lists.newArrayList();;
+  private List<PlanHint> planHints_ = Lists.newArrayList();
   private boolean isDistinct_;
 
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index ab80439..6dc8967 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -83,10 +83,11 @@ public class DataSourceScanNode extends ScanNode {
   // The number of rows estimate as returned by prepare().
   private long numRowsEstimate_;
 
-  public DataSourceScanNode(PlanNodeId id, TupleDescriptor desc) {
+  public DataSourceScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts) {
     super(id, desc, "SCAN DATA SOURCE");
     desc_ = desc;
     table_ = (DataSourceTable) desc_.getTable();
+    conjuncts_ = conjuncts;
     acceptedPredicates_ = null;
     acceptedConjuncts_ = null;
   }
@@ -94,8 +95,6 @@ public class DataSourceScanNode extends ScanNode {
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
     checkForSupportedFileFormats();
-    assignConjuncts(analyzer);
-    analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
     prepareDataSource();
     conjuncts_ = orderConjunctsByCost(conjuncts_);
     computeStats(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 9ff1a6b..9545828 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -414,6 +414,11 @@ public class HdfsScanNode extends ScanNode {
       for (Expr conjunct: collectionConjuncts) {
         if (!analyzer.evalAfterJoin(conjunct)) analyzer.markConjunctAssigned(conjunct);
       }
+
+      if (analyzer.getQueryCtx().client_request.getQuery_options().enable_expr_rewrites) {
+        Expr.optimizeConjuncts(collectionConjuncts, analyzer);
+      }
+
       if (!collectionConjuncts.isEmpty()) {
         analyzer.materializeSlots(collectionConjuncts);
         collectionConjuncts_.put(itemTupleDesc, collectionConjuncts);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index bc3cdc0..bb4181e 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -100,23 +100,14 @@ public class KuduScanNode extends ScanNode {
   // Exprs in kuduConjuncts_ converted to KuduPredicates.
   private final List<KuduPredicate> kuduPredicates_ = Lists.newArrayList();
 
-  public KuduScanNode(PlanNodeId id, TupleDescriptor desc) {
+  public KuduScanNode(PlanNodeId id, TupleDescriptor desc, List<Expr> conjuncts) {
     super(id, desc, "SCAN KUDU");
     kuduTable_ = (KuduTable) desc_.getTable();
+    conjuncts_ = conjuncts;
   }
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaRuntimeException {
-    conjuncts_.clear();
-    // Add bound predicates.
-    conjuncts_.addAll(analyzer.getBoundPredicates(desc_.getId()));
-    // Add unassigned predicates.
-    List<Expr> unassigned = analyzer.getUnassignedConjuncts(this);
-    conjuncts_.addAll(unassigned);
-    analyzer.markConjunctsAssigned(unassigned);
-    // Add equivalence predicates.
-    analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
-    Expr.removeDuplicates(conjuncts_);
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
     try (KuduClient client = KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 5c8218c..af6eb93 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1194,21 +1194,9 @@ public class SingleNodePlanner {
    * Otherwise, a HdfsScanNode will be created.
    */
   private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, boolean fastPartitionKeyScans,
-      Analyzer analyzer) throws ImpalaException {
+      List<Expr> conjuncts, Analyzer analyzer) throws ImpalaException {
     TupleDescriptor tupleDesc = hdfsTblRef.getDesc();
 
-    // Get all predicates bound by the tuple.
-    List<Expr> conjuncts = Lists.newArrayList();
-    conjuncts.addAll(analyzer.getBoundPredicates(tupleDesc.getId()));
-
-    // Also add remaining unassigned conjuncts
-    List<Expr> unassigned = analyzer.getUnassignedConjuncts(tupleDesc.getId().asList());
-    conjuncts.addAll(unassigned);
-    analyzer.markConjunctsAssigned(unassigned);
-
-    analyzer.createEquivConjuncts(tupleDesc.getId(), conjuncts);
-    Expr.removeDuplicates(conjuncts);
-
     // Do partition pruning before deciding which slots to materialize,
     // We might end up removing some predicates.
     HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc);
@@ -1270,18 +1258,42 @@ public class SingleNodePlanner {
   private PlanNode createScanNode(TableRef tblRef, boolean fastPartitionKeyScans,
       Analyzer analyzer) throws ImpalaException {
     ScanNode scanNode = null;
+
+    // Get all predicates bound by the tuple.
+    List<Expr> conjuncts = Lists.newArrayList();
+    TupleId tid = tblRef.getId();
+    conjuncts.addAll(analyzer.getBoundPredicates(tid));
+
+    // Also add remaining unassigned conjuncts
+    List<Expr> unassigned = analyzer.getUnassignedConjuncts(tid.asList());
+    conjuncts.addAll(unassigned);
+    analyzer.markConjunctsAssigned(unassigned);
+    analyzer.createEquivConjuncts(tid, conjuncts);
+
+    // Perform constant propagation and optimization if rewriting is enabled
+    if (analyzer.getQueryCtx().client_request.query_options.enable_expr_rewrites) {
+      if (!Expr.optimizeConjuncts(conjuncts, analyzer)) {
+        // Conjuncts implied false; convert to EmptySetNode
+        EmptySetNode node = new EmptySetNode(ctx_.getNextNodeId(), tid.asList());
+        node.init(analyzer);
+        return node;
+      }
+    } else {
+      Expr.removeDuplicates(conjuncts);
+    }
+
     Table table = tblRef.getTable();
     if (table instanceof HdfsTable) {
-      return createHdfsScanPlan(tblRef, fastPartitionKeyScans, analyzer);
+      return createHdfsScanPlan(tblRef, fastPartitionKeyScans, conjuncts, analyzer);
     } else if (table instanceof DataSourceTable) {
-      scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+      scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts);
       scanNode.init(analyzer);
       return scanNode;
     } else if (table instanceof HBaseTable) {
       // HBase table
       scanNode = new HBaseScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
     } else if (tblRef.getTable() instanceof KuduTable) {
-      scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc());
+      scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts);
       scanNode.init(analyzer);
       return scanNode;
     } else {
@@ -1290,11 +1302,6 @@ public class SingleNodePlanner {
     }
     // TODO: move this to HBaseScanNode.init();
     Preconditions.checkState(scanNode instanceof HBaseScanNode);
-
-    List<Expr> conjuncts = analyzer.getUnassignedConjuncts(scanNode);
-    // mark conjuncts_ assigned here; they will either end up inside a
-    // ValueRange or will be evaluated directly by the node
-    analyzer.markConjunctsAssigned(conjuncts);
     List<ValueRange> keyRanges = Lists.newArrayList();
     // determine scan predicates for clustering cols
     for (int i = 0; i < tblRef.getTable().getNumClusteringCols(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 80ba3b2..a67d353 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -59,6 +59,11 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testConstantPropagataion() {
+    runPlannerTestFile("constant-propagation");
+  }
+
+  @Test
   public void testEmpty() {
     runPlannerTestFile("empty");
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
index cb7dd01..c6d9c1a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/analytic-fns.test
@@ -1986,7 +1986,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypestiny]
    partitions=1/4 files=1 size=115B
-   predicates: functional.alltypestiny.id = 1, id = tinyint_col, functional.alltypestiny.tinyint_col = 1
+   predicates: functional.alltypestiny.id = 1, functional.alltypestiny.tinyint_col = 1
 ====
 # Don't propagate predicates through an inline view with an analytic
 # function that has a complex (non SlotRef) partition by clause for consistency with

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/conjunct-ordering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/conjunct-ordering.test b/testdata/workloads/functional-planner/queries/PlannerTest/conjunct-ordering.test
index 9b998c6..7084b26 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/conjunct-ordering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/conjunct-ordering.test
@@ -72,13 +72,13 @@ PLAN-ROOT SINK
 # Check that an IN predicate costs more than a single numeric comparison.
 select *
 from functional.alltypes a
-where a.int_col IN (1, 2, 3, 4, 5, 6, 7, 8, 9) and a.int_col = 1
+where a.int_col IN (1, 2, 3, 4, 5, 6, 7, 8, 9) and a.id = 1
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
-   predicates: a.int_col = 1, a.int_col IN (1, 2, 3, 4, 5, 6, 7, 8, 9)
+   predicates: a.id = 1, a.int_col IN (1, 2, 3, 4, 5, 6, 7, 8, 9)
 ====
 # Check that a timestamp comparison costs more than a numeric comparison.
 select *
@@ -94,15 +94,14 @@ PLAN-ROOT SINK
 # Check that string comparisons are ordered by string length.
 select *
 from functional.alltypes a
-where a.string_col = "looooooooooooooooong string" and
-      a.string_col = "medium string" and
+where a.date_string_col = "looooooooooooooooong string" and
       a.string_col = "a"
 ---- PLAN
 PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
-   predicates: a.string_col = 'a', a.string_col = 'medium string', a.string_col = 'looooooooooooooooong string'
+   predicates: a.string_col = 'a', a.date_string_col = 'looooooooooooooooong string'
 ====
 # Check that timestamp arithmetic adds cost.
 select *

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/constant-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-propagation.test
new file mode 100644
index 0000000..f9b9977
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-propagation.test
@@ -0,0 +1,361 @@
+# Check that impossible conditions are detected
+select *
+from functional.alltypes a
+where a.string_col = "looooooooooooooooong string" and
+      a.string_col = "mediumstring" and
+      a.string_col = "a"
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====
+# Test multiple forward propagation
+select * from functional.widetable_250_cols a
+where a.int_col1 = 10 and a.int_col2 = a.int_col1 + 1 and a.int_col3 = a.int_col2 * 5
+      and a.int_col4 = a.int_col3 * 2;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.widetable_250_cols a]
+   partitions=1/1 files=1 size=28.69KB
+   predicates: a.int_col1 = 10, a.int_col2 = 11, a.int_col3 = 55, a.int_col4 = 110
+====
+# Test multiple forward propagation
+select * from functional.widetable_250_cols a
+where a.int_col1 = 10 and a.int_col2 = a.int_col1 + 1 and a.int_col3 = a.int_col2 * 5
+      and a.int_col3 * -7 = a.int_col4
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.widetable_250_cols a]
+   partitions=1/1 files=1 size=28.69KB
+   predicates: a.int_col1 = 10, a.int_col2 = 11, a.int_col3 = 55, a.int_col4 = -385
+====
+# Test multiple forward propagation
+select * from functional.widetable_250_cols a
+where a.int_col1 = 10 and a.int_col2 = a.int_col1 + 1 and a.int_col2 * 5 = a.int_col3
+      and a.int_col3 * -9 = a.int_col4
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.widetable_250_cols a]
+   partitions=1/1 files=1 size=28.69KB
+   predicates: a.int_col1 = 10, a.int_col2 = 11, a.int_col3 = 55, a.int_col4 = -495
+====
+# Test multiple forward propagation, and a reversed propagation
+# (which fails as we can't rewrite 55 = a.int_col4 / 10)
+select * from functional.widetable_250_cols a
+where a.int_col1 = 10 and a.int_col2 = a.int_col1 + 1 and a.int_col3 = a.int_col2 * 5
+      and a.int_col4 / 10 = a.int_col3
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.widetable_250_cols a]
+   partitions=1/1 files=1 size=28.69KB
+   predicates: a.int_col1 = 10, a.int_col2 = 11, a.int_col3 = 55, 55 = a.int_col4 / 10
+====
+# Another impossibility (a.int_col3 = a.int_col2 * 5 = a.int_col2 * -7)
+select * from functional.widetable_250_cols a
+where a.int_col1 = 10 and a.int_col2 = a.int_col1 + 1 and a.int_col3 = a.int_col2 * 5
+      and a.int_col3 * -7 = a.int_col2;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====
+# An inline view which takes false conjuncts
+select count(*) from
+  (select id, int_col from functional.alltypes) T
+  where T.int_col = 10 and T.int_col = 12
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====
+# An inline view which becomes false at the scan node
+select count(*) from
+  (select id, int_col from functional.alltypes where int_col = 10) T
+  where T.int_col = 12
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====
+# Explicit casts are not considered for propagation
+select * from functional.alltypes a
+where cast(a.int_col as string) = 'abc' and cast(int_col as string) > 'xyz'
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes a]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: CAST(a.int_col AS STRING) = 'abc', CAST(int_col AS STRING) > 'xyz'
+====
+# Implicit casts are considered for propagation
+select * from functional.alltypes a
+where a.tinyint_col = 10000 and a.tinyint_col < 10000
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====
+# Constant predicates always get removed
+select count(*) from
+  (select sum(int_col) over(partition by int_col), int_col
+   from functional.alltypes where int_col = 10 group by int_col limit 10)  T
+where T.int_col = 12 and T.int_col > 1
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+02:SELECT
+|  predicates: int_col = 12, int_col > 1
+|
+01:AGGREGATE [FINALIZE]
+|  group by: int_col
+|  limit: 10
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col = 10
+====
+# Many constant predicates removed
+select count(*) from
+  (select * from functional.alltypes where int_col = 10) T
+where T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col
+  and 2 * T.int_col = 20
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col = 10
+====
+# All true predicates elided
+select count(*) from
+  (select * from functional.alltypes) T
+where 2 * 5 = 10 and 3 - 11 = -8 and 7 * 5 = 35
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+====
+# Many redundant / duplicate predicates
+ select count(*) from
+  (select * from functional.alltypes where int_col = 10) T
+where T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 2 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 3 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 4 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 5 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 6 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 7 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 8 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 9 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 2 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 3 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 4 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 5 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 6 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 7 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 8 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 9 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 2 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 3 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 4 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 5 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 6 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 7 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 8 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 9 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col and
+ T.int_col = 10 and T.int_col > 0 and T.int_col = 10 and T.int_col = T.int_col
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col = 10
+====
+# Non-trivial expr substitution (const false)
+select count(*) from
+  (select * from functional.alltypes where int_col = 10) T
+where T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+coalesce(NULL, T.int_col) + T.int_col * T.int_col = 100
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====
+# Non-trivial expr substitution (const true)
+select count(*) from
+  (select * from functional.alltypes where int_col = 10) T
+where T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+  (coalesce(NULL, T.int_col) + T.int_col * T.int_col = 100 OR
+   coalesce(NULL, T.int_col) + T.int_col = 20)
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col = 10
+====
+# Non-trivial expr substitution (non-constant)
+select count(*) from
+  (select * from functional.alltypes where int_col = 10) T
+where T.int_col = 10 and T.int_col > 1 and T.int_col = 10 and T.int_col = T.int_col and
+(coalesce(NULL, T.int_col) + random() * T.tinyint_col = 100 OR
+ coalesce(NULL, T.int_col) + T.int_col = 20)
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: int_col = 10, TRUE OR 10 + random() * functional.alltypes.tinyint_col = 100
+====
+# Collection predicates within HDFS scan nodes get optimized
+select 1
+from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
+where l_partkey < l_suppkey and c.c_nationkey = 10 and o_orderkey = 4 and l_suppkey = 10
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |
+|  04:SUBPLAN
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |
+|  |  06:UNNEST [o.o_lineitems]
+|  |
+|  03:UNNEST [c.c_orders o]
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c.c_nationkey = 10, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o_orderkey = 4
+   predicates on o_lineitems: l_partkey < 10, l_suppkey = 10
+====
+# Nested predicates also get propagated
+select 1
+from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
+where l_partkey < l_suppkey and c.c_nationkey = 10 and o_orderkey = o_shippriority
+  and l_suppkey = 10 and o_shippriority = c_nationkey
+---- PLAN
+PLAN-ROOT SINK
+|
+01:SUBPLAN
+|
+|--08:NESTED LOOP JOIN [INNER JOIN]
+|  |  join predicates: o_shippriority = c_nationkey
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |
+|  04:SUBPLAN
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |
+|  |  06:UNNEST [o.o_lineitems]
+|  |
+|  03:UNNEST [c.c_orders o]
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: c.c_nationkey = 10, !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems), o.o_orderkey = 10, o.o_shippriority = 10
+   predicates on o_lineitems: l_partkey < 10, l_suppkey = 10
+====
+# Using IS NULL
+select count(*) from functional.alltypes where id = 10 and bool_col is null
+   and id = bool_col;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====
+# Using = null
+select count(*) from functional.alltypes where id = 10 and bool_col = null and id = bool_col
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====
+# IS NULL and >
+select count(*) from functional.alltypes where id > 0 and bool_col is null
+   and id = bool_col;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:SCAN HDFS [functional.alltypes]
+   partitions=24/24 files=24 size=478.45KB
+   predicates: bool_col IS NULL, functional.alltypes.id IS NULL, id > 0, functional.alltypes.bool_col > 0, id = bool_col
+====
+# = NULL and >
+select count(*) from functional.alltypes where id > 0 and bool_col = null
+   and id = bool_col
+---- PLAN
+PLAN-ROOT SINK
+|
+01:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+00:EMPTYSET
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
index b814446..7cc5c04 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/data-source-tables.test
@@ -83,3 +83,16 @@ PLAN-ROOT SINK
 data source predicates: id IS NOT DISTINCT FROM 1, tinyint_col IS DISTINCT FROM 2, int_col IS NOT DISTINCT FROM 4
 predicates: bigint_col IS NOT DISTINCT FROM 5, bool_col IS NOT DISTINCT FROM TRUE, smallint_col IS DISTINCT FROM 3
 ====
+# EmptySet datasource
+select * from functional.alltypes_datasource
+where id <=> 1 and id = 2
+and bool_col <=> true
+and tinyint_col IS DISTINCT FROM 2
+and smallint_col IS DISTINCT FROM 3
+and int_col is not distinct from 4
+and bigint_col is not distinct from 5
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test b/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
index 03b2cdc..5ed6f6b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/hdfs.test
@@ -496,16 +496,14 @@ select * from functional.alltypesagg where day is null and day = 10
 ---- PLAN
 PLAN-ROOT SINK
 |
-00:SCAN HDFS [functional.alltypesagg]
-   partitions=0/11 files=0 size=0B
+00:EMPTYSET
 ====
 # partition key predicates which are in conjunctive normal form (case 1)
 select * from functional.alltypesagg where day <=> null and day = 10
 ---- PLAN
 PLAN-ROOT SINK
 |
-00:SCAN HDFS [functional.alltypesagg]
-   partitions=0/11 files=0 size=0B
+00:EMPTYSET
 ====
 # partition key predicates which are in conjunctive normal form (case 2)
 select * from functional.alltypesagg where day is null and month = 1
@@ -916,8 +914,7 @@ select * from scale_db.num_partitions_1234_blocks_per_partition_1 where j = 1 an
 ---- PLAN
 PLAN-ROOT SINK
 |
-00:SCAN HDFS [scale_db.num_partitions_1234_blocks_per_partition_1]
-   partitions=0/1234 files=0 size=0B
+00:EMPTYSET
 ====
 select * from scale_db.num_partitions_1234_blocks_per_partition_1 where j <=> 1 and j <=> 2
 ---- PLAN

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index f28ecb0..0fdb19d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -329,7 +329,6 @@ where a.day >= 6
 and a.tinyint_col = 15
 and b.id = '5'
 and b.tinyint_col = 5
-and b.tinyint_col > 123
 and a.tinyint_col + b.tinyint_col < 15
 ---- PLAN
 PLAN-ROOT SINK
@@ -342,7 +341,7 @@ PLAN-ROOT SINK
 |--01:SCAN HBASE [functional_hbase.stringids b]
 |     start key: 5
 |     stop key: 5\0
-|     predicates: b.tinyint_col > 123, b.tinyint_col = 5
+|     predicates: b.tinyint_col = 5
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=5/11 files=5 size=372.38KB
@@ -372,13 +371,45 @@ PLAN-ROOT SINK
 |  01:SCAN HBASE [functional_hbase.stringids b]
 |     start key: 5
 |     stop key: 5\0
-|     predicates: b.tinyint_col > 123, b.tinyint_col = 5
+|     predicates: b.tinyint_col = 5
 |
 00:SCAN HDFS [functional.alltypesagg a]
    partitions=5/11 files=5 size=372.38KB
    predicates: a.tinyint_col = 15
    runtime filters: RF000 -> a.int_col, RF001 -> a.id
 ====
+# hbase-hdfs join with scan filtering (bogus)
+select *
+from functional.alltypesagg a join functional_hbase.stringids b
+       on (a.id = cast(b.id as int) and a.int_col = b.int_col)
+where a.day >= 6
+and a.tinyint_col = 15
+and b.id = '5'
+and b.tinyint_col = 5
+and b.tinyint_col > 123
+and a.tinyint_col + b.tinyint_col < 15
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: a.int_col = b.int_col, a.id = CAST(b.id AS INT)
+|  other predicates: a.tinyint_col + b.tinyint_col < 15
+|  runtime filters: RF000 <- b.int_col, RF001 <- CAST(b.id AS INT)
+|
+|--01:EMPTYSET
+|
+00:SCAN HDFS [functional.alltypesagg a]
+   partitions=5/11 files=5 size=372.38KB
+   predicates: a.tinyint_col = 15
+   runtime filters: RF000 -> a.int_col, RF001 -> a.id
+---- SCANRANGELOCATIONS
+NODE 0:
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=10/100110.txt 0:76263
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=6/100106.txt 0:76263
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=7/100107.txt 0:76263
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=8/100108.txt 0:76263
+  HDFS SPLIT hdfs://localhost:20500/test-warehouse/alltypesagg/year=2010/month=1/day=9/100109.txt 0:76263
+====
 # left join followed by right join and then aggregate
 select x.tinyint_col, count(x.day)
 from (

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
index c99de37..a978b8b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test
@@ -114,6 +114,17 @@ PLAN-ROOT SINK
 00:SCAN KUDU [functional_kudu.testtbl]
    kudu predicates: id <= 20, id >= 10, zip < 50, zip <= 30, zip <= 5, zip > 1, zip >= 0, name = 'foo'
 ====
+# Constant propagation works for Kudu
+select * from functional_kudu.alltypes t
+where t.int_col = 10 and t.bigint_col = t.int_col * 100 and
+      cast(t.tinyint_col as bigint) = t.bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN KUDU [functional_kudu.alltypes t]
+   predicates: CAST(t.tinyint_col AS BIGINT) = 1000
+   kudu predicates: t.bigint_col = 1000, t.int_col = 10
+====
 # Test constant folding.
 select * from functional_kudu.testtbl
 where id < 10 + 30  and cast(sin(id) as boolean) = true and 20 * 3 >= id and 10 * 10 + 3 > id
@@ -182,6 +193,15 @@ PLAN-ROOT SINK
    predicates: CAST(sin(id) AS BOOLEAN) = TRUE
    kudu predicates: name IS NULL
 ====
+# Derived EmptySets for Kudu
+select * from functional_kudu.alltypes t
+where t.int_col = 10 and t.bigint_col = t.int_col * 100 and
+      CAST(t.int_col as BIGINT) = t.bigint_col
+---- PLAN
+PLAN-ROOT SINK
+|
+00:EMPTYSET
+====
 # IMPALA-3856: KuduScanNode crash when pushing predicates including a cast
 select o_orderkey from tpch_kudu.orders where o_orderkey < 10.0 order by 1
 ---- PLAN
@@ -210,7 +230,7 @@ PLAN-ROOT SINK
 # IMPALA-4213: Planner not pushing some predicates with constant exprs to Kudu
 select count(*) from functional_kudu.alltypes
 where id < 1475059765 + 10
-and 1475059765 + 100 < id
+and 1475059765 - 100 < id
 ---- PLAN
 PLAN-ROOT SINK
 |
@@ -218,7 +238,7 @@ PLAN-ROOT SINK
 |  output: count(*)
 |
 00:SCAN KUDU [functional_kudu.alltypes]
-   kudu predicates: id < 1475059775, id > 1475059865
+   kudu predicates: id < 1475059775, id > 1475059665
 ====
 # IMPALA-2521: clustered insert into table adds sort node.
 insert into table functional_kudu.alltypes /*+ clustered */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
index 1d0bfe5..bd2d705 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/predicate-propagation.test
@@ -1235,7 +1235,7 @@ select * from
 functional.alltypes a
 left anti join
   (select id from functional.alltypestiny
-   where id > 20) b
+   where id > -20) b
 on (a.id = b.id)
 where a.id < 10
 ---- PLAN
@@ -1246,7 +1246,7 @@ PLAN-ROOT SINK
 |
 |--01:SCAN HDFS [functional.alltypestiny]
 |     partitions=4/4 files=4 size=460B
-|     predicates: functional.alltypestiny.id < 10, id > 20
+|     predicates: functional.alltypestiny.id < 10, id > -20
 |
 00:SCAN HDFS [functional.alltypes a]
    partitions=24/24 files=24 size=478.45KB
@@ -1255,7 +1255,7 @@ PLAN-ROOT SINK
 # Anti-joins have a uni-directional value transfer (IMPALA-1249).
 select * from
   (select id from functional.alltypes
-   where id > 20) a
+   where id > -20) a
 right anti join functional.alltypestiny b
 on (a.id = b.id)
 where b.id < 10
@@ -1271,7 +1271,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
-   predicates: functional.alltypes.id < 10, id > 20
+   predicates: functional.alltypes.id < 10, id > -20
 ====
 # Test proper predicate assignment with predicate propagation when the
 # generated predicate is bound by an outer joined tuple (IMPALA-2018)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
index 44e59a0..ee8f59a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/subquery-rewrite.test
@@ -1267,7 +1267,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional.alltypesagg g]
    partitions=11/11 files=11 size=814.73KB
-   predicates: g.bigint_col = 1, g.bigint_col < 1000
+   predicates: g.bigint_col = 1
 ====
 # Aggregate subquery in an IS NULL predicate
 select *

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/77304530/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
index 19868be..e88d928 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
@@ -48,8 +48,8 @@ BIGINT
 ====
 ---- QUERY
 select string_col from alltypes_datasource
-where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && id EQ 1 && id NE 1'
-      and id < 1 and id > 1 and id <= 1 and id >= 1 and id = 1 and id != 1
+where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && int_col EQ 1 && id NE 1'
+      and id < 1 and id > 1 and id <= 1 and id >= 1 and int_col = 1 and id != 1
 ---- RESULTS
 'SUCCESS'
 ---- TYPES
@@ -57,8 +57,8 @@ STRING
 ====
 ---- QUERY
 select string_col from alltypes_datasource
-where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && id EQ 1 && id NE 1'
-      and 1 > id and 1 < id and 1 >= id and 1 <= id and 1 = id and 1 != id
+where string_col = 'VALIDATE_PREDICATES##id LT 1 && id GT 1 && id LE 1 && id GE 1 && int_col EQ 1 && id NE 1'
+      and 1 > id and 1 < id and 1 >= id and 1 <= id and 1 = int_col and 1 != id
 ---- RESULTS
 'SUCCESS'
 ---- TYPES
@@ -66,8 +66,9 @@ STRING
 ====
 ---- QUERY
 # Test that <=>, IS DISTINCT FROM, and IS NOT DISTINCT FROM all can be validated
+# Note the duplicate predicate 1 IS NOT DISTINCT FROM id is removed.
 select string_col from alltypes_datasource
-where string_col = 'VALIDATE_PREDICATES##id NOT_DISTINCT 1 && id DISTINCT_FROM 1 && id NOT_DISTINCT 1'
+where string_col = 'VALIDATE_PREDICATES##id NOT_DISTINCT 1 && id DISTINCT_FROM 1'
       and 1 <=> id and 1 IS DISTINCT FROM id and 1 IS NOT DISTINCT FROM id
 ---- RESULTS
 'SUCCESS'