You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/11/19 22:55:57 UTC

[4/5] incubator-impala git commit: IMPALA-5976: Remove equivalence class computation in FE

IMPALA-5976: Remove equivalence class computation in FE

Equivalent class is used to get the equivalencies between slots. It is
ill-defined and the current implementation is inefficient. This patch
removes it and directly uses the information from the value transfer
graph instead.
Value transfer graph is reimplemented using Tarjan's strongly connected
component algorithm and BFS with adjacency lists to speed up on both
condensed and sparse graphs.

Testing: It passes the existing tests. In planner tests the equivalence
between SCC-condensed graph and uncondensed graph is checked. A test
case is added for a helper class IntArrayList. An outer-join edge case
is added in planner test. On a query with 1800 union operations, the
equivalence class computation time is reduced from 7m57s to 65ms and the
planning time is reduced from 8m5s to 13s.

Change-Id: If4cb1d8be46efa8fd61a97048cc79dabe2ffa51a
Reviewed-on: http://gerrit.cloudera.org:8080/8317
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5e9b4e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5e9b4e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5e9b4e2f

Branch: refs/heads/master
Commit: 5e9b4e2fd2a459a3b61e8939e3085ad8e1a5795e
Parents: 1033426
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Tue Oct 17 15:55:26 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Nov 18 09:07:06 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AggregateInfo.java   |   2 +-
 .../impala/analysis/AggregateInfoBase.java      |   2 +-
 .../apache/impala/analysis/AnalyticExpr.java    |   6 +-
 .../org/apache/impala/analysis/Analyzer.java    | 860 ++++++-------------
 .../impala/analysis/BetweenPredicate.java       |   6 +-
 .../apache/impala/analysis/BinaryPredicate.java |  10 +-
 .../org/apache/impala/analysis/BoolLiteral.java |   7 +-
 .../org/apache/impala/analysis/CaseExpr.java    |   6 +-
 .../org/apache/impala/analysis/CastExpr.java    |  16 +-
 .../impala/analysis/CompoundPredicate.java      |   5 +-
 .../apache/impala/analysis/ExistsPredicate.java |   5 +-
 .../java/org/apache/impala/analysis/Expr.java   |  99 ++-
 .../impala/analysis/FunctionCallExpr.java       |  12 +-
 .../apache/impala/analysis/InlineViewRef.java   |   2 +-
 .../apache/impala/analysis/IsNullPredicate.java |   5 +-
 .../apache/impala/analysis/LikePredicate.java   |   5 +-
 .../org/apache/impala/analysis/NullLiteral.java |   6 -
 .../apache/impala/analysis/NumericLiteral.java  |   5 +-
 .../org/apache/impala/analysis/QueryStmt.java   |   2 +-
 .../org/apache/impala/analysis/SlotRef.java     |  25 +-
 .../apache/impala/analysis/StringLiteral.java   |   6 +-
 .../org/apache/impala/analysis/Subquery.java    |   6 +-
 .../impala/analysis/TimestampLiteral.java       |   6 +-
 .../impala/analysis/TupleIsNullPredicate.java   |   6 +-
 .../apache/impala/planner/AnalyticPlanner.java  |   6 +-
 .../impala/planner/DistributedPlanner.java      |  57 +-
 .../org/apache/impala/planner/PlanFragment.java |  19 -
 .../impala/planner/SingleNodePlanner.java       |  22 +-
 .../main/java/org/apache/impala/util/Graph.java | 387 +++++++++
 .../org/apache/impala/util/IntArrayList.java    |  97 +++
 .../org/apache/impala/util/IntIterator.java     |  40 +
 .../apache/impala/util/IntArrayListTest.java    | 102 +++
 .../queries/PlannerTest/outer-joins.test        | 105 +++
 33 files changed, 1168 insertions(+), 777 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
index 3b0ad9c..2e142ff 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java
@@ -574,7 +574,7 @@ public class AggregateInfo extends AggregateInfoBase {
           new SlotRef(outputTupleDesc_.getSlots().get(i)),
           new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
       if (i < groupingExprs_.size()) {
-        analyzer.createAuxEquivPredicate(
+        analyzer.createAuxEqPredicate(
             new SlotRef(outputTupleDesc_.getSlots().get(i)),
             new SlotRef(intermediateTupleDesc_.getSlots().get(i)));
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/AggregateInfoBase.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfoBase.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfoBase.java
index d6ef084..8aa977f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfoBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfoBase.java
@@ -133,7 +133,7 @@ public abstract class AggregateInfoBase {
         // it'll simply show up as a gratuitous HAVING predicate
         // (which would actually be incorrect if the constant happens to be NULL)
         if (!expr.isConstant()) {
-          analyzer.createAuxEquivPredicate(new SlotRef(slotDesc), expr.clone());
+          analyzer.createAuxEqPredicate(new SlotRef(slotDesc), expr.clone());
         }
       } else {
         Preconditions.checkArgument(expr instanceof FunctionCallExpr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index 28de6da..e9a4f1b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -126,9 +126,9 @@ public class AnalyticExpr extends Expr {
   public AnalyticWindow getWindow() { return window_; }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    AnalyticExpr o = (AnalyticExpr)obj;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    AnalyticExpr o = (AnalyticExpr)that;
     if (!fnCall_.equals(o.getFnCall())) return false;
     if ((window_ == null) != (o.window_ == null)) return false;
     if (window_ != null) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index df5cf97..0ecc958 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -75,9 +75,8 @@ import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.util.DisjointSet;
-import org.apache.impala.util.ListMap;
-import org.apache.impala.util.TSessionStateUtil;
+import org.apache.impala.util.*;
+import org.apache.impala.util.Graph.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,13 +99,15 @@ import com.google.common.collect.Sets;
  * conjuncts are referenced by their id (ie, there are no containers other than the one
  * holding the referenced conjuncts), to make substitute() simple.
  *
- * Slot equivalence classes:
- * Equivalence of individual slots is computed based on registered equality predicates;
- * those predicates are either present directly in the query or are implied by the
- * syntactic elements used in query (example: a GROUP BY clause has implied equality
- * predicates between the grouping exprs and the grouping slots of the aggregation
- * output tuple).
- * Implied equality predicates are registered with createAuxEquivPredicate(); they are
+ * Slot value transfers:
+ * Slot A has a value transfer to slot B if a predicate on A can be applied to B at some
+ * point in the plan. Determining the lowest correct placement of that predicate is
+ * subject to the conventional assignment rules.
+ * Each slot is contained in exactly one equivalence class. A slot equivalence class is a
+ * set of slots where each pair of slots has a mutual value transfer. Equivalence classes
+ * are assigned an arbitrary id to distinguish them from another.
+ *
+ * Implied mutual value transfers are registered with createAuxEqPredicate(); they are
  * never assigned during plan generation.
  * Also tracks each catalog object access, so authorization checks can be performed once
  * analysis is complete.
@@ -286,22 +287,8 @@ public class Analyzer {
     // warnings are added which will not be displayed.
     public boolean warningsRetrieved = false;
 
-    public final IdGenerator<EquivalenceClassId> equivClassIdGenerator =
-        EquivalenceClassId.createGenerator();
-
-    // map from equivalence class id to the list of its member slots
-    private final Map<EquivalenceClassId, ArrayList<SlotId>> equivClassMembers =
-        Maps.newHashMap();
-
-    // map from slot id to its equivalence class id;
-    // only visible at the root Analyzer
-    private final Map<SlotId, EquivalenceClassId> equivClassBySlotId = Maps.newHashMap();
-
-    // map for each slot to the canonical slot of its equivalence class
-    private final ExprSubstitutionMap equivClassSmap = new ExprSubstitutionMap();
-
-    // represents the direct and transitive value transfers between slots
-    private ValueTransferGraph valueTransferGraph;
+    // The SCC-condensed graph representation of all slot value transfers.
+    private SccCondensedGraph valueTransferGraph;
 
     private final List<Pair<SlotId, SlotId>> registeredValueTransfers =
         Lists.newArrayList();
@@ -1169,13 +1156,13 @@ public class Analyzer {
   }
 
   /**
-   * Create and register an auxiliary predicate to express an equivalence between two
-   * exprs (BinaryPredicate with EQ); this predicate does not need to be assigned, but
-   * it's used for equivalence class computation.
-   * Does nothing if the lhs or rhs expr are NULL. Registering an equivalence with NULL
-   * would be incorrect, because <expr> = NULL is false (even NULL = NULL).
+   * Create and register an auxiliary predicate to express a mutual value transfer
+   * between two exprs (BinaryPredicate with EQ); this predicate does not need to be
+   * assigned, but it's used for value transfer computation.
+   * Does nothing if the lhs or rhs expr are NULL. Registering with NULL would be
+   * incorrect, because <expr> = NULL is false (even NULL = NULL).
    */
-  public void createAuxEquivPredicate(Expr lhs, Expr rhs) {
+  public void createAuxEqPredicate(Expr lhs, Expr rhs) {
     // Check the expr type as well as the class because  NullLiteral could have been
     // implicitly cast to a type different than NULL.
     if (lhs instanceof NullLiteral || rhs instanceof NullLiteral ||
@@ -1186,7 +1173,7 @@ public class Analyzer {
     BinaryPredicate p = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
     p.setIsAuxExpr();
     if (LOG.isTraceEnabled()) {
-      LOG.trace("register equiv predicate: " + p.toSql() + " " + p.debugString());
+      LOG.trace("register eq predicate: " + p.toSql() + " " + p.debugString());
     }
     registerConjunct(p);
   }
@@ -1491,8 +1478,8 @@ public class Analyzer {
 
   /**
    * Returns a list of predicates that are fully bound by destTid. Predicates are derived
-   * by replacing the slots of a source predicate with slots of the destTid, if for each
-   * source slot there is an equivalent slot in destTid.
+   * by replacing the slots of a source predicate with slots of the destTid, if every
+   * source slot has a value transfer to a slot in destTid.
    * In particular, the returned list contains predicates that must be evaluated
    * at a join node (bound to outer-joined tuple) but can also be safely evaluated by a
    * plan node materializing destTid. Such predicates are not marked as assigned.
@@ -1519,25 +1506,32 @@ public class Analyzer {
       // Generate slot-mappings to bind srcConjunct to destTid.
       TupleId srcTid = srcTids.get(0);
       List<List<SlotId>> allDestSids =
-          getEquivDestSlotIds(srcTid, srcSids, destTid, ignoreSlots);
+          getValueTransferDestSlotIds(srcTid, srcSids, destTid, ignoreSlots);
       if (allDestSids.isEmpty()) continue;
 
-      // Indicates whether the source slots have equivalent slots that belong
-      // to an outer-joined tuple.
+      // Indicates whether there is value transfer from the source slots to slots that
+      // belong to an outer-joined tuple.
       boolean hasOuterJoinedTuple = false;
       for (SlotId srcSid: srcSids) {
-        if (hasOuterJoinedTuple(globalState_.equivClassBySlotId.get(srcSid))) {
-          hasOuterJoinedTuple = true;
-          break;
+        for (SlotId dst : getValueTransferTargets(srcSid)) {
+          if (isOuterJoined(getTupleId(dst))) {
+            hasOuterJoinedTuple = true;
+            break;
+          }
         }
+        if (hasOuterJoinedTuple) break;
       }
 
       // It is incorrect to propagate predicates into a plan subtree that is on the
       // nullable side of an outer join if the predicate evaluates to true when all
-      // its referenced tuples are NULL. The check below is conservative because the
-      // outer-joined tuple making 'hasOuterJoinedTuple' true could be in a parent block
-      // of 'srcConjunct', in which case it is safe to propagate 'srcConjunct' within
-      // child blocks of the outer-joined parent block.
+      // its referenced tuples are NULL. For example:
+      // select * from (select A.a, B.b, B.col from A left join B on A.a=B.b) v
+      // where v.col is null
+      // In this query (v.col is null) should not be evaluated at the scanner of B.
+      // The check below is conservative because the outer-joined tuple making
+      // 'hasOuterJoinedTuple' true could be in a parent block of 'srcConjunct', in which
+      // case it is safe to propagate 'srcConjunct' within child blocks of the
+      // outer-joined parent block.
       // TODO: Make the check precise by considering the blocks (analyzers) where the
       // outer-joined tuples in the dest slot's equivalence classes appear
       // relative to 'srcConjunct'.
@@ -1635,10 +1629,10 @@ public class Analyzer {
   }
 
   /**
-   * For each equivalence class, adds/removes predicates from conjuncts such that it
+   * For each slot equivalence class, adds/removes predicates from conjuncts such that it
    * contains a minimum set of <lhsSlot> = <rhsSlot> predicates that establish the known
-   * equivalences between slots in lhsTids and rhsTids which must be disjoint.
-   * Preserves original conjuncts when possible. Assumes that predicates for establishing
+   * equivalences between slots in lhsTids and rhsTids which must be disjoint. Preserves
+   * original conjuncts when possible. Assumes that predicates for establishing
    * equivalences among slots in only lhsTids and only rhsTids have already been
    * established. This function adds the remaining predicates to "connect" the disjoint
    * equivalent slot sets of lhsTids and rhsTids.
@@ -1650,17 +1644,13 @@ public class Analyzer {
    * TODO: Consider caching the DisjointSet during plan generation instead of
    * re-creating it here on every invocation.
    */
-  public <T extends Expr> void createEquivConjuncts(List<TupleId> lhsTids,
-      List<TupleId> rhsTids, List<T> conjuncts) {
+  public void createEquivConjuncts(List<TupleId> lhsTids,
+      List<TupleId> rhsTids, List<BinaryPredicate> conjuncts) {
     Preconditions.checkState(Collections.disjoint(lhsTids, rhsTids));
-
-    // Equivalence classes only containing slots belonging to lhsTids.
-    Map<EquivalenceClassId, List<SlotId>> lhsEquivClasses =
-        getEquivClasses(lhsTids);
-
-    // Equivalence classes only containing slots belonging to rhsTids.
-    Map<EquivalenceClassId, List<SlotId>> rhsEquivClasses =
-        getEquivClasses(rhsTids);
+    // A map from equivalence class IDs to equivalence classes. The equivalence classes
+    // only contain slots in lhsTids/rhsTids.
+    Map<Integer, List<SlotId>> lhsEquivClasses = getEquivClassesOnTuples(lhsTids);
+    Map<Integer, List<SlotId>> rhsEquivClasses = getEquivClassesOnTuples(rhsTids);
 
     // Maps from a slot id to its set of equivalent slots. Used to track equivalences
     // that have been established by predicates assigned/generated to plan nodes
@@ -1682,15 +1672,15 @@ public class Analyzer {
 
     // Update partialEquivSlots based on equality predicates in 'conjuncts'. Removes
     // redundant conjuncts, unless they reference outer-joined slots (see below).
-    Iterator<T> conjunctIter = conjuncts.iterator();
+    Iterator<BinaryPredicate> conjunctIter = conjuncts.iterator();
     while (conjunctIter.hasNext()) {
       Expr conjunct = conjunctIter.next();
       Pair<SlotId, SlotId> eqSlots = BinaryPredicate.getEqSlots(conjunct);
       if (eqSlots == null) continue;
-      EquivalenceClassId firstEqClassId = getEquivClassId(eqSlots.first);
-      EquivalenceClassId secondEqClassId = getEquivClassId(eqSlots.second);
+      int firstEqClassId = getEquivClassId(eqSlots.first);
+      int secondEqClassId = getEquivClassId(eqSlots.second);
       // slots may not be in the same eq class due to outer joins
-      if (!firstEqClassId.equals(secondEqClassId)) continue;
+      if (firstEqClassId != secondEqClassId) continue;
 
       // Retain an otherwise redundant predicate if it references a slot of an
       // outer-joined tuple that is not already referenced by another join predicate
@@ -1722,7 +1712,7 @@ public class Analyzer {
 
     // For each equivalence class, construct a new predicate to 'connect' the disjoint
     // slot sets.
-    for (Map.Entry<EquivalenceClassId, List<SlotId>> rhsEquivClass:
+    for (Map.Entry<Integer, List<SlotId>> rhsEquivClass:
       rhsEquivClasses.entrySet()) {
       List<SlotId> lhsSlots = lhsEquivClasses.get(rhsEquivClass.getKey());
       if (lhsSlots == null) continue;
@@ -1733,19 +1723,16 @@ public class Analyzer {
       // Do not create a new predicate from slots that are full outer joined because that
       // predicate may be incorrectly assigned to a node below the associated full outer
       // join.
-      if (isFullOuterJoined(lhsSlots.get(0)) || isFullOuterJoined(rhsSlots.get(0))) {
-        continue;
+      if (!isFullOuterJoined(lhsSlots.get(0)) && !isFullOuterJoined(rhsSlots.get(0))) {
+        conjuncts.add(createInferredEqPred(lhsSlots.get(0), rhsSlots.get(0)));
       }
-      T newEqPred = (T) createInferredEqPred(lhsSlots.get(0), rhsSlots.get(0));
-      if (!hasMutualValueTransfer(lhsSlots.get(0), rhsSlots.get(0))) continue;
-      conjuncts.add(newEqPred);
     }
   }
 
   /**
-   * For each equivalence class, adds/removes predicates from conjuncts such that
-   * it contains a minimum set of <slot> = <slot> predicates that establish
-   * the known equivalences between slots belonging to tid. Preserves original
+   * For each slot equivalence class, adds/removes predicates from conjuncts such that it
+   * contains a minimum set of <slot> = <slot> predicates that establish the known
+   * equivalences between slots belonging to tid. Preserves original
    * conjuncts when possible.
    * The intent of this function is to enable construction of a minimum spanning tree
    * to cover the known slot equivalences. This function should be called to add
@@ -1772,10 +1759,10 @@ public class Analyzer {
       Expr conjunct = conjunctIter.next();
       Pair<SlotId, SlotId> eqSlots = BinaryPredicate.getEqSlots(conjunct);
       if (eqSlots == null) continue;
-      EquivalenceClassId firstEqClassId = getEquivClassId(eqSlots.first);
-      EquivalenceClassId secondEqClassId = getEquivClassId(eqSlots.second);
+      int firstEqClassId = getEquivClassId(eqSlots.first);
+      int secondEqClassId = getEquivClassId(eqSlots.second);
       // slots may not be in the same eq class due to outer joins
-      if (!firstEqClassId.equals(secondEqClassId)) continue;
+      if (firstEqClassId != secondEqClassId) continue;
       // update equivalences and remove redundant conjuncts
       if (!partialEquivSlots.union(eqSlots.first, eqSlots.second)) conjunctIter.remove();
     }
@@ -1795,9 +1782,9 @@ public class Analyzer {
 
     // These are the equivalences that need to be established by constructing conjuncts
     // to form a minimum spanning tree.
-    Map<EquivalenceClassId, List<SlotId>> targetEquivClasses =
-        getEquivClasses(Lists.newArrayList(tid));
-    for (Map.Entry<EquivalenceClassId, List<SlotId>> targetEquivClass:
+    Map<Integer, List<SlotId>> targetEquivClasses =
+        getEquivClassesOnTuples(Lists.newArrayList(tid));
+    for (Map.Entry<Integer, List<SlotId>> targetEquivClass:
       targetEquivClasses.entrySet()) {
       // Loop over all pairs of equivalent slots and merge their disjoint slots sets,
       // creating missing equality predicates as necessary.
@@ -1808,7 +1795,6 @@ public class Analyzer {
         for (int j = 0; j < i; ++j) {
           SlotId lhs = slotIds.get(j);
           if (!partialEquivSlots.union(lhs, rhs)) continue;
-          if (!hasMutualValueTransfer(lhs, rhs)) continue;
           conjuncts.add((T) createInferredEqPred(lhs, rhs));
           // Check for early termination.
           if (partialEquivSlots.get(lhs).size() == slotIds.size()) {
@@ -1826,20 +1812,22 @@ public class Analyzer {
   }
 
   /**
-   * Returns a map of partial equivalence classes that only contains slot ids belonging
-   * to the given tuple ids. Only contains equivalence classes with more than one member.
+   * Returns a map of slot equivalence classes on the set of slots in the given tuples.
+   * Only contains equivalence classes with more than one member.
    */
-  private Map<EquivalenceClassId, List<SlotId>> getEquivClasses(List<TupleId> tids) {
-    Map<EquivalenceClassId, List<SlotId>> result = Maps.newHashMap();
+  private Map<Integer, List<SlotId>> getEquivClassesOnTuples(List<TupleId> tids) {
+    Map<Integer, List<SlotId>> result = Maps.newHashMap();
+    SccCondensedGraph g = globalState_.valueTransferGraph;
     for (TupleId tid: tids) {
       for (SlotDescriptor slotDesc: getTupleDesc(tid).getSlots()) {
-        EquivalenceClassId eqClassId = getEquivClassId(slotDesc.getId());
+        if (slotDesc.getId().asInt() >= g.numVertices()) continue;
+        int sccId = g.sccId(slotDesc.getId().asInt());
         // Ignore equivalence classes that are empty or only have a single member.
-        if (globalState_.equivClassMembers.get(eqClassId).size() <= 1) continue;
-        List<SlotId> slotIds = result.get(eqClassId);
+        if (g.sccMembersBySccId(sccId).length <= 1) continue;
+        List<SlotId> slotIds = result.get(sccId);
         if (slotIds == null) {
           slotIds = Lists.newArrayList();
-          result.put(eqClassId, slotIds);
+          result.put(sccId, slotIds);
         }
         slotIds.add(slotDesc.getId());
       }
@@ -1849,13 +1837,13 @@ public class Analyzer {
 
   /**
    * Returns a list of slot mappings from srcTid to destTid for the purpose of predicate
-   * propagation. Each mapping assigns every slot in srcSids to an equivalent slot in
-   * destTid. Does not generate all possible mappings, but limits the results to
-   * useful and/or non-redundant mappings, i.e., those mappings that would improve
-   * the performance of query execution.
+   * propagation. Each mapping assigns every slot in srcSids to a slot in destTid which
+   * has a value transfer from srcSid. Does not generate all possible mappings, but limits
+   * the results to useful and/or non-redundant mappings, i.e., those mappings that would
+   * improve the performance of query execution.
    */
-  private List<List<SlotId>> getEquivDestSlotIds(TupleId srcTid, List<SlotId> srcSids,
-      TupleId destTid, Set<SlotId> ignoreSlots) {
+  private List<List<SlotId>> getValueTransferDestSlotIds(TupleId srcTid,
+      List<SlotId> srcSids, TupleId destTid, Set<SlotId> ignoreSlots) {
     List<List<SlotId>> allDestSids = Lists.newArrayList();
     TupleDescriptor destTupleDesc = getTupleDesc(destTid);
     if (srcSids.size() == 1) {
@@ -1902,18 +1890,6 @@ public class Analyzer {
   }
 
   /**
-   * Returns true if the equivalence class identified by 'eqClassId' contains
-   * a slot belonging to an outer-joined tuple.
-   */
-  private boolean hasOuterJoinedTuple(EquivalenceClassId eqClassId) {
-    ArrayList<SlotId> eqClass = globalState_.equivClassMembers.get(eqClassId);
-    for (SlotId s: eqClass) {
-      if (isOuterJoined(getTupleId(s))) return true;
-    }
-    return false;
-  }
-
-  /**
    * Returns true if 'p' evaluates to true when all its referenced slots are NULL,
    * false otherwise.
    * TODO: Can we avoid dealing with the exceptions thrown by analysis and eval?
@@ -1991,143 +1967,220 @@ public class Analyzer {
   }
 
   /**
-   * Populate globalState.valueTransfer based on the registered equi-join predicates
-   * of the form <slotref> = <slotref>.
+   * Compute the value transfer graph based on the registered value transfers and eq-join
+   * predicates.
    */
-  public void computeEquivClasses() {
-    globalState_.valueTransferGraph = new ValueTransferGraph();
-    globalState_.valueTransferGraph.computeValueTransfers();
-
+  public void computeValueTransferGraph() {
+    WritableGraph directValueTransferGraph =
+        new WritableGraph(globalState_.descTbl.getMaxSlotId().asInt() + 1);
+    constructValueTransfersFromEqPredicates(directValueTransferGraph);
+    for (Pair<SlotId, SlotId> p : globalState_.registeredValueTransfers) {
+      directValueTransferGraph.addEdge(p.first.asInt(), p.second.asInt());
+    }
+    globalState_.valueTransferGraph =
+        SccCondensedGraph.condensedReflexiveTransitiveClosure(directValueTransferGraph);
     // Validate the value-transfer graph in single-node planner tests.
     if (RuntimeEnv.INSTANCE.isTestEnv() && getQueryOptions().num_nodes == 1) {
-      Preconditions.checkState(validateValueTransferGraph(),
-          "Failed to validate value-transfer graph.");
-    }
-
-    // we start out by assigning each slot to its own equiv class
-    int numSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-    for (int i = 0; i < numSlots; ++i) {
-      EquivalenceClassId id = globalState_.equivClassIdGenerator.getNextId();
-      globalState_.equivClassMembers.put(id, Lists.newArrayList(new SlotId(i)));
-    }
-
-    // merge two classes if there is a value transfer between all members of the
-    // combined class; do this until there's nothing left to merge
-    boolean merged;
-    do {
-      merged = false;
-      for (Map.Entry<EquivalenceClassId, ArrayList<SlotId>> e1:
-          globalState_.equivClassMembers.entrySet()) {
-        for (Map.Entry<EquivalenceClassId, ArrayList<SlotId>> e2:
-            globalState_.equivClassMembers.entrySet()) {
-          if (e1.getKey() == e2.getKey()) continue;
-          List<SlotId> class1Members = e1.getValue();
-          if (class1Members.isEmpty()) continue;
-          List<SlotId> class2Members = e2.getValue();
-          if (class2Members.isEmpty()) continue;
-
-          // check whether we can transfer values between all members
-          boolean canMerge = true;
-          for (SlotId class1Slot: class1Members) {
-            for (SlotId class2Slot: class2Members) {
-              if (!hasValueTransfer(class1Slot, class2Slot)
-                  && !hasValueTransfer(class2Slot, class1Slot)) {
-                canMerge = false;
-                break;
-              }
-            }
-            if (!canMerge) break;
-          }
-          if (!canMerge) continue;
+      RandomAccessibleGraph reference =
+          directValueTransferGraph.toRandomAccessible().reflexiveTransitiveClosure();
+      if (!globalState_.valueTransferGraph.validate(reference)) {
+        String tc = reference.print();
+        String condensedTc = globalState_.valueTransferGraph.print();
+        throw new IllegalStateException("Condensed transitive closure doesn't equal to "
+            + "uncondensed transitive closure. Uncondensed Graph:\n" + tc +
+            "\nCondensed Graph:\n" + condensedTc);
+      }
+    }
+  }
 
-          // merge classes 1 and 2 by transfering 2 into 1
-          class1Members.addAll(class2Members);
-          class2Members.clear();
-          merged = true;
+  /**
+   * Add value-transfer edges to 'g' based on the registered equi-join conjuncts.
+   */
+  private void constructValueTransfersFromEqPredicates(WritableGraph g) {
+    for (ExprId id : globalState_.conjuncts.keySet()) {
+      Expr e = globalState_.conjuncts.get(id);
+      Pair<SlotId, SlotId> slotIds = BinaryPredicate.getEqSlots(e);
+      if (slotIds == null) continue;
+
+      TableRef sjTblRef = globalState_.sjClauseByConjunct.get(id);
+      Preconditions.checkState(sjTblRef == null || sjTblRef.getJoinOp().isSemiJoin());
+      boolean isAntiJoin = sjTblRef != null && sjTblRef.getJoinOp().isAntiJoin();
+
+      TableRef ojTblRef = globalState_.ojClauseByConjunct.get(id);
+      Preconditions.checkState(ojTblRef == null || ojTblRef.getJoinOp().isOuterJoin());
+      if (ojTblRef == null && !isAntiJoin) {
+        // this eq predicate doesn't involve any outer or anti join, ie, it is true for
+        // each result row;
+        // value transfer is not legal if the receiving slot is in an enclosed
+        // scope of the source slot and the receiving slot's block has a limit
+        Analyzer firstBlock = globalState_.blockBySlot.get(slotIds.first);
+        Analyzer secondBlock = globalState_.blockBySlot.get(slotIds.second);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("value transfer: from " + slotIds.first.toString());
         }
+        if (!(secondBlock.hasLimitOffsetClause_ &&
+            secondBlock.ancestors_.contains(firstBlock))) {
+          g.addEdge(slotIds.first.asInt(), slotIds.second.asInt());
+        }
+        if (!(firstBlock.hasLimitOffsetClause_ &&
+            firstBlock.ancestors_.contains(secondBlock))) {
+          g.addEdge(slotIds.second.asInt(), slotIds.first.asInt());
+        }
+        continue;
       }
-    } while (merged);
-
-    // populate equivClassSmap
-    for (EquivalenceClassId id: globalState_.equivClassMembers.keySet()) {
-      List<SlotId> members = globalState_.equivClassMembers.get(id);
-      if (members.isEmpty()) continue;
-      SlotDescriptor canonicalSlotDesc =
-          globalState_.descTbl.getSlotDesc(members.get(0));
-      for (SlotId slotId: globalState_.equivClassMembers.get(id)) {
-        SlotDescriptor slotDesc = globalState_.descTbl.getSlotDesc(slotId);
-        globalState_.equivClassSmap.put(
-            new SlotRef(slotDesc), new SlotRef(canonicalSlotDesc));
+      // Outer or semi-joined table ref.
+      TableRef tblRef = (ojTblRef != null) ? ojTblRef : sjTblRef;
+      Preconditions.checkNotNull(tblRef);
+
+      if (tblRef.getJoinOp() == JoinOperator.FULL_OUTER_JOIN) {
+        // full outer joins don't guarantee any value transfer
+        continue;
       }
-    }
 
-    // populate equivClassBySlotId
-    for (EquivalenceClassId id: globalState_.equivClassMembers.keySet()) {
-      for (SlotId slotId: globalState_.equivClassMembers.get(id)) {
-        globalState_.equivClassBySlotId.put(slotId, id);
+      // this is some form of outer or anti join
+      SlotId outerSlot, innerSlot;
+      if (tblRef.getId() == getTupleId(slotIds.first)) {
+        innerSlot = slotIds.first;
+        outerSlot = slotIds.second;
+      } else if (tblRef.getId() == getTupleId(slotIds.second)) {
+        innerSlot = slotIds.second;
+        outerSlot = slotIds.first;
+      } else {
+        // this eq predicate is part of an OJ/AJ clause but doesn't reference
+        // the joined table -> ignore this, we can't reason about when it'll
+        // actually be true
+        continue;
+      }
+      // value transfer is always legal because the outer and inner slot must come from
+      // the same block; transitive value transfers into inline views with a limit are
+      // prevented because the inline view's aux predicates won't transfer values into
+      // the inline view's block (handled in the 'tableRef == null' case above)
+      // TODO: We could propagate predicates into anti-joined plan subtrees by
+      // inverting the condition (paying special attention to NULLs).
+      if (tblRef.getJoinOp() == JoinOperator.LEFT_OUTER_JOIN
+          || tblRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
+          || tblRef.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+        g.addEdge(outerSlot.asInt(), innerSlot.asInt());
+      } else if (tblRef.getJoinOp() == JoinOperator.RIGHT_OUTER_JOIN
+          || tblRef.getJoinOp() == JoinOperator.RIGHT_ANTI_JOIN) {
+        g.addEdge(innerSlot.asInt(), outerSlot.asInt());
       }
     }
   }
 
+
   /**
-   * Returns the ids of slots that are in the same equivalence class as slotId
-   * and are part of a tuple in tupleIds.
+   * Returns the equivalence class of the given slot id.
+   * Time complexity: O(V) where V = number of slots
    */
-  public List<SlotId> getEquivSlots(SlotId slotId, List<TupleId> tupleIds) {
-    List<SlotId> result = Lists.newArrayList();
-    EquivalenceClassId classId = globalState_.equivClassBySlotId.get(slotId);
-    for (SlotId memberId: globalState_.equivClassMembers.get(classId)) {
-      if (tupleIds.contains(
-          globalState_.descTbl.getSlotDesc(memberId).getParent().getId())) {
-        result.add(memberId);
-      }
+  public List<SlotId> getEquivClass(SlotId sid) {
+    SccCondensedGraph g = globalState_.valueTransferGraph;
+    if (sid.asInt() >= g.numVertices()) return Collections.singletonList(sid);
+    ArrayList<SlotId> result = new ArrayList<>();
+    for (int dst: g.sccMembersByVid(sid.asInt())) {
+      result.add(new SlotId(dst));
     }
     return result;
   }
 
   /**
-   * Returns the ids of all the slots for which there is a value transfer from 'srcSid'.
+   * Returns sorted slot IDs with value transfers from 'srcSid'.
+   * Time complexity: O(V) where V = number of slots
    */
   public List<SlotId> getValueTransferTargets(SlotId srcSid) {
-    List<SlotId> result = Lists.newArrayList();
-    int maxSlotId = globalState_.valueTransferGraph.getNumSlots();
-    if (srcSid.asInt() >= maxSlotId) return result;
-    for (SlotDescriptor slot: globalState_.descTbl.getSlotDescs()) {
-      SlotId targetSid = slot.getId();
-      if (targetSid.asInt() >= maxSlotId) continue;
-      if (hasValueTransfer(srcSid, targetSid)) result.add(targetSid);
-    }
+    SccCondensedGraph g = globalState_.valueTransferGraph;
+    if (srcSid.asInt() >= g.numVertices()) return Collections.singletonList(srcSid);
+    ArrayList<SlotId> result = new ArrayList<>();
+    for (IntIterator dstIt = g.dstIter(srcSid.asInt()); dstIt.hasNext(); dstIt.next()) {
+      result.add(new SlotId(dstIt.peek()));
+    }
+    // Unsorted result drastically changes the runtime filter assignment and results in
+    // worse plan.
+    // TODO: Investigate the call sites and remove this sort.
+    Collections.sort(result);
     return result;
   }
 
-  public EquivalenceClassId getEquivClassId(SlotId slotId) {
-    return globalState_.equivClassBySlotId.get(slotId);
+  /** Get the id of the equivalence class of the given slot. */
+  private int getEquivClassId(SlotId sid) {
+    SccCondensedGraph g = globalState_.valueTransferGraph;
+    return sid.asInt() >= g.numVertices() ?
+        sid.asInt() : g.sccId(sid.asInt());
   }
 
-  public ExprSubstitutionMap getEquivClassSmap() { return globalState_.equivClassSmap; }
+  /**
+   * Returns whether there is a value transfer between two SlotRefs.
+   * It's used for {@link Expr#matches(Expr, SlotRef.Comparator)} )}
+   */
+  private final SlotRef.Comparator VALUE_TRANSFER_SLOTREF_CMP = new SlotRef.Comparator() {
+      @Override
+      public boolean matches(SlotRef a, SlotRef b) {
+        return hasValueTransfer(a.getSlotId(), b.getSlotId());
+      }
+    };
+
+  /**
+   * Returns whether there is a mutual value transfer between two SlotRefs.
+   * It's used for {@link Expr#matches(Expr, SlotRef.Comparator)} )}
+   */
+  private final SlotRef.Comparator MUTUAL_VALUE_TRANSFER_SLOTREF_CMP =
+      new SlotRef.Comparator() {
+        @Override
+        public boolean matches(SlotRef a, SlotRef b) {
+          return hasMutualValueTransfer(a.getSlotId(), b.getSlotId());
+        }
+      };
 
   /**
-   * Return true if l1 is equivalent to l2 when both lists are interpreted as sets.
-   * For this check, each SlotRefs in both l1 and l2 is replaced with its canonical
-   * equivalence-class representative, and then duplicate exprs are removed.
+   * Returns if e1 has (mutual) value transfer to e2. An expr e1 has value transfer to e2
+   * if the tree structure of the two exprs are the same ignoring implicit casts, and for
+   * each pair of corresponding slots there is a value transfer from the slot in e1 to the
+   * slot in e2.
    */
-  public boolean equivSets(List<Expr> l1, List<Expr> l2) {
-    List<Expr> substL1 =
-        Expr.substituteList(l1, globalState_.equivClassSmap, this, false);
-    Expr.removeDuplicates(substL1);
-    List<Expr> substL2 =
-        Expr.substituteList(l2, globalState_.equivClassSmap, this, false);
-    Expr.removeDuplicates(substL2);
-    return Expr.equalSets(substL1, substL2);
+  public boolean exprsHaveValueTransfer(Expr e1, Expr e2, boolean mutual) {
+    return e1.matches(e2, mutual ?
+        MUTUAL_VALUE_TRANSFER_SLOTREF_CMP : VALUE_TRANSFER_SLOTREF_CMP);
   }
 
   /**
-   * Returns true if e1 and e2 are equivalent expressions.
+   * Return true if two sets of exprs have (mutual) value transfer. Set l1 has value
+   * transfer to set s2 there is 1-to-1 value transfer between exprs in l1 and l2.
    */
-  public boolean equivExprs(Expr e1, Expr e2) {
-    Expr substE1 = e1.substitute(globalState_.equivClassSmap, this, false);
-    Expr substE2 = e2.substitute(globalState_.equivClassSmap, this, false);
-    return substE1.equals(substE2);
+  public boolean setsHaveValueTransfer(List<Expr> l1, List<Expr> l2, boolean mutual) {
+    l1 = Expr.removeDuplicates(l1, MUTUAL_VALUE_TRANSFER_SLOTREF_CMP);
+    l2 = Expr.removeDuplicates(l2, MUTUAL_VALUE_TRANSFER_SLOTREF_CMP);
+    if (l1.size() != l2.size()) return false;
+    for (Expr e2 : l2) {
+      boolean foundInL1 = false;
+      for (Expr e1 : l1) {
+        if (e1.matches(e2, mutual ?
+            MUTUAL_VALUE_TRANSFER_SLOTREF_CMP : VALUE_TRANSFER_SLOTREF_CMP)) {
+          foundInL1 = true;
+          break;
+        }
+      }
+      if (!foundInL1) return false;
+    }
+    return true;
+  }
+
+  /**
+   * Compute the intersection of l1 and l2. Two exprs are considered identical if they
+   * have mutual value transfer. Return the intersecting l1 elements in i1 and the
+   * intersecting l2 elements in i2.
+   */
+  public void exprIntersect(List<Expr> l1, List<Expr> l2, List<Expr> i1, List<Expr> i2) {
+    i1.clear();
+    i2.clear();
+    for (Expr e1 : l1) {
+      for (Expr e2 : l2) {
+        if (e1.matches(e2, MUTUAL_VALUE_TRANSFER_SLOTREF_CMP)) {
+          i1.add(e1);
+          i2.add(e2);
+          break;
+        }
+      }
+    }
   }
 
   /**
@@ -2497,34 +2550,14 @@ public class Analyzer {
   public int decrementCallDepth() { return --callDepth_; }
   public int getCallDepth() { return callDepth_; }
 
-  public boolean hasMutualValueTransfer(SlotId slotA, SlotId slotB) {
-    return hasValueTransfer(slotA, slotB) && hasValueTransfer(slotB, slotA);
+  public boolean hasMutualValueTransfer(SlotId a, SlotId b) {
+    return hasValueTransfer(a, b) && hasValueTransfer(b, a);
   }
 
   public boolean hasValueTransfer(SlotId a, SlotId b) {
-    return globalState_.valueTransferGraph.hasValueTransfer(a, b);
-  }
-
-  public boolean validateValueTransferGraph() {
-    StringBuilder actual = new StringBuilder();
-    StringBuilder expected = new StringBuilder();
-    boolean res = globalState_.valueTransferGraph.validate(actual, expected);
-    return res;
-  }
-
-  /**
-   * Assign all remaining unassigned slots to their own equivalence classes.
-   */
-  public void createIdentityEquivClasses() {
-    int numSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-    for (int i = 0; i < numSlots; ++i) {
-      SlotId slotId = new SlotId(i);
-      if (globalState_.equivClassBySlotId.get(slotId) == null) {
-        EquivalenceClassId classId = globalState_.equivClassIdGenerator.getNextId();
-        globalState_.equivClassMembers.put(classId, Lists.newArrayList(slotId));
-        globalState_.equivClassBySlotId.put(slotId, classId);
-      }
-    }
+    SccCondensedGraph g = globalState_.valueTransferGraph;
+    return a.equals(b) || (a.asInt() < g.numVertices() && b.asInt() < g.numVertices()
+        && g.hasEdge(a.asInt(), b.asInt()));
   }
 
   public Map<String, View> getLocalViews() { return localViews_; }
@@ -2578,365 +2611,4 @@ public class Analyzer {
         .onTable(tableName.getDb(), tableName.getTbl())
         .allOf(priv).toRequest());
   }
-
-  /**
-   * Efficiently computes and stores the transitive closure of the value transfer graph
-   * of slots. After calling computeValueTransfers(), value transfers between slots can
-   * be queried via hasValueTransfer(). Value transfers can be uni-directional due to
-   * outer joins, inline views with a limit, or unions.
-   */
-  private class ValueTransferGraph {
-    // Represents all bi-directional value transfers. Each disjoint set is a complete
-    // subgraph of value transfers. Maps from a slot id to its set of slots with mutual
-    // value transfers (in the original slot domain). Since the value transfer graph is
-    // a DAG these disjoint sets represent all the strongly connected components.
-    private final DisjointSet<SlotId> completeSubGraphs_ = new DisjointSet<SlotId>();
-
-    // Maps each slot id in the original slot domain to a slot id in the new slot domain
-    // created by coalescing complete subgraphs into a single slot, and retaining only
-    // slots that have value transfers.
-    // Used for representing a condensed value-transfer graph with dense slot ids.
-    private int[] coalescedSlots_;
-
-    // Used for generating slot ids in the new slot domain.
-    private int nextCoalescedSlotId_ = 0;
-
-    // Condensed DAG of value transfers in the new slot domain.
-    private boolean[][] valueTransfer_;
-
-    // Number of slots registered at the time when the value transfer graph was
-    // computed.
-    private final int numSlots_ = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-
-    public int getNumSlots() { return numSlots_; }
-
-    /**
-     * Computes all direct and transitive value transfers based on the registered
-     * conjuncts of the form <slotref> = <slotref>. The high-level steps are:
-     * 1. Identify complete subgraphs based on bi-directional value transfers, and
-     *    coalesce the slots of each complete subgraph into a single slot.
-     * 2. Map the remaining uni-directional value transfers into the new slot domain.
-     * 3. Identify the connected components of the uni-directional value transfers.
-     *    This step partitions the value transfers into disjoint sets.
-     * 4. Compute the transitive closure of each partition from (3) in the new slot
-     *    domain separately. Hopefully, the partitions are small enough to afford
-     *    the O(N^3) complexity of the Floyd-Warshall transitive closure computation.
-     * The condensed graph is not transformed back into the original slot domain because
-     * of the potential performance penalty. Instead, hasValueTransfer() consults
-     * coalescedSlots_, valueTransfer_, and completeSubGraphs_ which can together
-     * determine any value transfer in the original slot domain in constant time.
-     */
-    public void computeValueTransfers() {
-      long start = System.currentTimeMillis();
-
-      // Step1: Compute complete subgraphs and get uni-directional value transfers.
-      List<Pair<SlotId, SlotId>> origValueTransfers = Lists.newArrayList();
-      partitionValueTransfers(completeSubGraphs_, origValueTransfers);
-
-      // Coalesce complete subgraphs into a single slot and assign new slot ids.
-      coalescedSlots_ = new int[numSlots_];
-      Arrays.fill(coalescedSlots_, -1);
-      for (Set<SlotId> equivClass: completeSubGraphs_.getSets()) {
-        int representative = nextCoalescedSlotId_;
-        for (SlotId slotId: equivClass) {
-          coalescedSlots_[slotId.asInt()] = representative;
-        }
-        ++nextCoalescedSlotId_;
-      }
-
-      // Step 2: Map uni-directional value transfers onto the new slot domain, and
-      // store the connected components in graphPartitions.
-      List<Pair<Integer, Integer>> coalescedValueTransfers = Lists.newArrayList();
-      // A graph partition is a set of slot ids that are connected by uni-directional
-      // value transfers. The graph corresponding to a graph partition is a DAG.
-      DisjointSet<Integer> graphPartitions = new DisjointSet<Integer>();
-      mapSlots(origValueTransfers, coalescedValueTransfers, graphPartitions);
-      mapSlots(globalState_.registeredValueTransfers, coalescedValueTransfers,
-          graphPartitions);
-
-      // Step 3: Group the coalesced value transfers by the graph partition they
-      // belong to. Maps from the graph partition to its list of value transfers.
-      // TODO: Implement a specialized DisjointSet data structure to avoid this step.
-      Map<Set<Integer>, List<Pair<Integer, Integer>>> partitionedValueTransfers =
-          Maps.newHashMap();
-      for (Pair<Integer, Integer> vt: coalescedValueTransfers) {
-        Set<Integer> partition = graphPartitions.get(vt.first.intValue());
-        List<Pair<Integer, Integer>> l = partitionedValueTransfers.get(partition);
-        if (l == null) {
-          l = Lists.newArrayList();
-          partitionedValueTransfers.put(partition, l);
-        }
-        l.add(vt);
-      }
-
-      // Initialize the value transfer graph.
-      int numCoalescedSlots = nextCoalescedSlotId_ + 1;
-      valueTransfer_ = new boolean[numCoalescedSlots][numCoalescedSlots];
-      for (int i = 0; i < numCoalescedSlots; ++i) {
-        valueTransfer_[i][i] = true;
-      }
-
-      // Step 4: Compute the transitive closure for each graph partition.
-      for (Map.Entry<Set<Integer>, List<Pair<Integer, Integer>>> graphPartition:
-        partitionedValueTransfers.entrySet()) {
-        // Set value transfers of this partition.
-        for (Pair<Integer, Integer> vt: graphPartition.getValue()) {
-          valueTransfer_[vt.first][vt.second] = true;
-        }
-        Set<Integer> partitionSlotIds = graphPartition.getKey();
-        // No transitive value transfers.
-        if (partitionSlotIds.size() <= 2) continue;
-
-        // Indirection vector into valueTransfer_. Contains one entry for each distinct
-        // slot id referenced in a value transfer of this partition.
-        int[] p = new int[partitionSlotIds.size()];
-        int numPartitionSlots = 0;
-        for (Integer slotId: partitionSlotIds) {
-          p[numPartitionSlots++] = slotId;
-        }
-        // Compute the transitive closure of this graph partition.
-        for (int j = 0; j < numPartitionSlots; ++j) {
-          for (int i = 0; i < numPartitionSlots; ++i) {
-            // Our graphs are typically sparse so this filters out a lot of iterations.
-            if (!valueTransfer_[p[i]][p[j]]) continue;
-            for (int k = 0; k < numPartitionSlots; ++k) {
-              valueTransfer_[p[i]][p[k]] |= valueTransfer_[p[j]][p[k]];
-            }
-          }
-        }
-      }
-
-      long end = System.currentTimeMillis();
-      if (LOG.isDebugEnabled()) {
-        LOG.trace("Time taken in computeValueTransfers(): " + (end - start) + "ms");
-      }
-    }
-
-    /**
-     * Bulk updates the value transfer graph based on the given list of new mutual value
-     * transfers. The first element of each pair must be an existing slot id already in
-     * the value transfer graph and the second element must be a new slot id not yet in
-     * the value transfer graph. In particular, this requirement means that no new
-     * complete subgraphs may be introduced by the new slots.
-     */
-    public void bulkUpdate(List<Pair<SlotId, SlotId>> mutualValueTransfers) {
-      // Requires an existing value transfer graph.
-      Preconditions.checkState(valueTransfer_ != null);
-      int oldNumSlots = coalescedSlots_.length;
-      int maxNumSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-      // Expand the coalesced slots to the new maximum number of slots,
-      // and initalize the new entries with -1.
-      coalescedSlots_ = Arrays.copyOf(coalescedSlots_, maxNumSlots);
-      Arrays.fill(coalescedSlots_, oldNumSlots, maxNumSlots, -1);
-      for (Pair<SlotId, SlotId> valTrans: mutualValueTransfers) {
-        SlotId existingSid = valTrans.first;
-        SlotId newSid = valTrans.second;
-        // New slot id must not already be registered in the value transfer graph.
-        Preconditions.checkState(completeSubGraphs_.get(newSid) == null);
-        Preconditions.checkState(coalescedSlots_[newSid.asInt()] == -1);
-        completeSubGraphs_.union(existingSid, newSid);
-        coalescedSlots_[newSid.asInt()] = coalescedSlots_[existingSid.asInt()];
-      }
-    }
-
-    /**
-     * Returns true if slotA always has the same value as slotB or the tuple
-     * containing slotB is NULL.
-     */
-    public boolean hasValueTransfer(SlotId slotA, SlotId slotB) {
-      if (slotA.equals(slotB)) return true;
-      int mappedSrcId = coalescedSlots_[slotA.asInt()];
-      int mappedDestId = coalescedSlots_[slotB.asInt()];
-      if (mappedSrcId == -1 || mappedDestId == -1) return false;
-      if (valueTransfer_[mappedSrcId][mappedDestId]) return true;
-      Set<SlotId> eqSlots = completeSubGraphs_.get(slotA);
-      if (eqSlots == null) return false;
-      return eqSlots.contains(slotB);
-    }
-
-    /**
-     * Maps the slots of the given origValueTransfers to the coalescedSlots_. For most
-     * queries the uni-directional value transfers only reference a fraction of the
-     * original slots, so we assign new slot ids as necessary to make them dense. Returns
-     * the new list of value transfers in coalescedValueTransfers. Also adds each new
-     * value transfer into the given graphPartitions (via union() on the slots).
-     */
-    private void mapSlots(List<Pair<SlotId, SlotId>> origValueTransfers,
-        List<Pair<Integer, Integer>> coalescedValueTransfers,
-        DisjointSet<Integer> graphPartitions) {
-      for (Pair<SlotId, SlotId> vt: origValueTransfers) {
-        int src = coalescedSlots_[vt.first.asInt()];
-        if (src == -1) {
-          src = nextCoalescedSlotId_;
-          coalescedSlots_[vt.first.asInt()] = nextCoalescedSlotId_;
-          ++nextCoalescedSlotId_;
-        }
-        int dest = coalescedSlots_[vt.second.asInt()];
-        if (dest == -1) {
-          dest = nextCoalescedSlotId_;
-          coalescedSlots_[vt.second.asInt()] = nextCoalescedSlotId_;
-          ++nextCoalescedSlotId_;
-        }
-        coalescedValueTransfers.add(
-            new Pair<Integer, Integer>(Integer.valueOf(src), Integer.valueOf(dest)));
-        graphPartitions.union(Integer.valueOf(src), Integer.valueOf(dest));
-      }
-    }
-
-    /**
-     * Transforms the registered equality predicates of the form <slotref> = <slotref>
-     * into disjoint sets of slots with mutual value transfers (completeSubGraphs),
-     * and a list of remaining uni-directional value transfers (valueTransfers).
-     * Both completeSubGraphs and valueTransfers use the original slot ids.
-     *
-     * For debugging: If completeSubGraphs is null, adds all value transfers including
-     * bi-directional ones into valueTransfers.
-     */
-    private void partitionValueTransfers(DisjointSet<SlotId> completeSubGraphs,
-        List<Pair<SlotId, SlotId>> valueTransfers) {
-      // transform equality predicates into a transfer graph
-      for (ExprId id: globalState_.conjuncts.keySet()) {
-        Expr e = globalState_.conjuncts.get(id);
-        Pair<SlotId, SlotId> slotIds = BinaryPredicate.getEqSlots(e);
-        if (slotIds == null) continue;
-
-        boolean isAntiJoin = false;
-        TableRef sjTblRef = globalState_.sjClauseByConjunct.get(id);
-        Preconditions.checkState(sjTblRef == null || sjTblRef.getJoinOp().isSemiJoin());
-        isAntiJoin = sjTblRef != null && sjTblRef.getJoinOp().isAntiJoin();
-
-        TableRef ojTblRef = globalState_.ojClauseByConjunct.get(id);
-        Preconditions.checkState(ojTblRef == null || ojTblRef.getJoinOp().isOuterJoin());
-        if (ojTblRef == null && !isAntiJoin) {
-          // this eq predicate doesn't involve any outer or anti join, ie, it is true for
-          // each result row;
-          // value transfer is not legal if the receiving slot is in an enclosed
-          // scope of the source slot and the receiving slot's block has a limit
-          Analyzer firstBlock = globalState_.blockBySlot.get(slotIds.first);
-          Analyzer secondBlock = globalState_.blockBySlot.get(slotIds.second);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("value transfer: from " + slotIds.first.toString());
-          }
-          Pair<SlotId, SlotId> firstToSecond = null;
-          Pair<SlotId, SlotId> secondToFirst = null;
-          if (!(secondBlock.hasLimitOffsetClause_ &&
-              secondBlock.ancestors_.contains(firstBlock))) {
-            firstToSecond = new Pair<SlotId, SlotId>(slotIds.first, slotIds.second);
-          }
-          if (!(firstBlock.hasLimitOffsetClause_ &&
-              firstBlock.ancestors_.contains(secondBlock))) {
-            secondToFirst = new Pair<SlotId, SlotId>(slotIds.second, slotIds.first);
-          }
-          // Add bi-directional value transfers to the completeSubGraphs, or
-          // uni-directional value transfers to valueTransfers.
-          if (firstToSecond != null && secondToFirst != null
-              && completeSubGraphs != null) {
-            completeSubGraphs.union(slotIds.first, slotIds.second);
-          } else {
-            if (firstToSecond != null) valueTransfers.add(firstToSecond);
-            if (secondToFirst != null) valueTransfers.add(secondToFirst);
-          }
-          continue;
-        }
-        // Outer or semi-joined table ref.
-        TableRef tblRef = (ojTblRef != null) ? ojTblRef : sjTblRef;
-        Preconditions.checkNotNull(tblRef);
-
-        if (tblRef.getJoinOp() == JoinOperator.FULL_OUTER_JOIN) {
-          // full outer joins don't guarantee any value transfer
-          continue;
-        }
-
-        // this is some form of outer or anti join
-        SlotId outerSlot, innerSlot;
-        if (tblRef.getId() == getTupleId(slotIds.first)) {
-          innerSlot = slotIds.first;
-          outerSlot = slotIds.second;
-        } else if (tblRef.getId() == getTupleId(slotIds.second)) {
-          innerSlot = slotIds.second;
-          outerSlot = slotIds.first;
-        } else {
-          // this eq predicate is part of an OJ/AJ clause but doesn't reference
-          // the joined table -> ignore this, we can't reason about when it'll
-          // actually be true
-          continue;
-        }
-
-        // value transfer is always legal because the outer and inner slot must come from
-        // the same block; transitive value transfers into inline views with a limit are
-        // prevented because the inline view's aux predicates won't transfer values into
-        // the inline view's block (handled in the 'tableRef == null' case above)
-        // TODO: We could propagate predicates into anti-joined plan subtrees by
-        // inverting the condition (paying special attention to NULLs).
-        if (tblRef.getJoinOp() == JoinOperator.LEFT_OUTER_JOIN
-            || tblRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
-            || tblRef.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
-          valueTransfers.add(new Pair<SlotId, SlotId>(outerSlot, innerSlot));
-        } else if (tblRef.getJoinOp() == JoinOperator.RIGHT_OUTER_JOIN
-            || tblRef.getJoinOp() == JoinOperator.RIGHT_ANTI_JOIN) {
-          valueTransfers.add(new Pair<SlotId, SlotId>(innerSlot, outerSlot));
-        }
-      }
-    }
-
-    /**
-     * Debug utility to validate the correctness of the value transfer graph using a
-     * brute-force transitive closure algorithm.
-     * Returns true if this value transfer graph is identical to one computed with
-     * the brute-force method, false otherwise.
-     * Writes string representations of the expected and actual value transfer
-     * matrices into expected and actual, respectively.
-     */
-    public boolean validate(StringBuilder expected, StringBuilder actual) {
-      Preconditions.checkState(expected.length() == 0 && actual.length() == 0);
-      int numSlots = globalState_.descTbl.getMaxSlotId().asInt() + 1;
-      boolean[][] expectedValueTransfer = new boolean[numSlots][numSlots];
-      for (int i = 0; i < numSlots; ++i) {
-        expectedValueTransfer[i][i] = true;
-      }
-
-      // Initialize expectedValueTransfer with the value transfers from conjuncts_.
-      List<Pair<SlotId, SlotId>> valueTransfers = Lists.newArrayList();
-      partitionValueTransfers(null, valueTransfers);
-      for (Pair<SlotId, SlotId> vt: valueTransfers) {
-        expectedValueTransfer[vt.first.asInt()][vt.second.asInt()] = true;
-      }
-      // Set registered value transfers in expectedValueTransfer.
-      for (Pair<SlotId, SlotId> vt: globalState_.registeredValueTransfers) {
-        expectedValueTransfer[vt.first.asInt()][vt.second.asInt()] = true;
-      }
-
-      // Compute the expected transitive closure.
-      boolean changed = false;
-      do {
-        changed = false;
-        for (int i = 0; i < numSlots; ++i) {
-          for (int j = 0; j < numSlots; ++j) {
-            for (int k = 0; k < numSlots; ++k) {
-              if (expectedValueTransfer[i][j] && expectedValueTransfer[j][k]
-                  && !expectedValueTransfer[i][k]) {
-                expectedValueTransfer[i][k] = true;
-                changed = true;
-              }
-            }
-          }
-        }
-      } while (changed);
-
-      // Populate actual value transfer graph.
-      boolean[][] actualValueTransfer = new boolean[numSlots][numSlots];
-      for (int i = 0; i < numSlots; ++i) {
-        for (int j = 0; j < numSlots; ++j) {
-          actualValueTransfer[i][j] = hasValueTransfer(new SlotId(i), new SlotId(j));
-        }
-      }
-
-      // Print matrices and string-compare them.
-      PrintUtils.printMatrix(expectedValueTransfer, 3, expected);
-      PrintUtils.printMatrix(actualValueTransfer, 3, actual);
-      String expectedStr = expected.toString();
-      String actualStr = actual.toString();
-      return expectedStr.equals(actualStr);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
index 4cedeac..269217c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
@@ -76,9 +76,9 @@ public class BetweenPredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return isNotBetween_ == ((BetweenPredicate)obj).isNotBetween_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) &&
+        isNotBetween_ == ((BetweenPredicate)that).isNotBetween_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
index afdfaa6..444c003 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
@@ -31,13 +31,13 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.extdatasource.thrift.TComparisonOp;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.TExprNodeType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Most predicates with two operands.
@@ -333,10 +333,8 @@ public class BinaryPredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    BinaryPredicate other = (BinaryPredicate) obj;
-    return op_.equals(other.op_);
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && op_.equals(((BinaryPredicate)that).op_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java b/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
index 838f120..cec702c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
@@ -60,11 +60,8 @@ public class BoolLiteral extends LiteralExpr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) {
-      return false;
-    }
-    return ((BoolLiteral) obj).value_ == value_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && ((BoolLiteral) that).value_ == value_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java b/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
index 237076a..a51ca60 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
@@ -188,9 +188,9 @@ public class CaseExpr extends Expr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    CaseExpr expr = (CaseExpr) obj;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    CaseExpr expr = (CaseExpr)that;
     return hasCaseExpr_ == expr.hasCaseExpr_
         && hasElseExpr_ == expr.hasElseExpr_
         && isDecode() == expr.isDecode();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
index f185696..29e1736 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
@@ -297,17 +297,11 @@ public class CastExpr extends Expr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) return true;
-    if (obj instanceof CastExpr) {
-      CastExpr other = (CastExpr) obj;
-      return isImplicit_ == other.isImplicit_
-          && type_.equals(other.type_)
-          && super.equals(obj);
-    }
-    // Ignore implicit casts when comparing expr trees.
-    if (isImplicit_) return getChild(0).equals(obj);
-    return false;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    CastExpr other = (CastExpr) that;
+    return isImplicit_ == other.isImplicit_
+        && type_.equals(other.type_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
index ba1ef8d..f0ff00f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
@@ -89,9 +89,8 @@ public class CompoundPredicate extends Predicate {
   public Operator getOp() { return op_; }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return ((CompoundPredicate) obj).op_ == op_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && ((CompoundPredicate) that).op_ == op_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
index 5acd04a..381848c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
@@ -61,9 +61,8 @@ public class ExistsPredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o)) return false;
-    return notExists_ == ((ExistsPredicate)o).notExists_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && notExists_ == ((ExistsPredicate)that).notExists_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 1eba24f..a235662 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -310,7 +310,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   /**
    * Perform semantic analysis of node and all of its children.
    * Throws exception if any errors found.
-   * @see org.apache.impala.parser.ParseNode#analyze(org.apache.impala.parser.Analyzer)
+   * @see ParseNode#analyze(Analyzer)
    */
   public final void analyze(Analyzer analyzer) throws AnalysisException {
     if (isAnalyzed()) return;
@@ -688,23 +688,45 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   /**
-   * Returns true if two expressions are equal. The equality comparison works on analyzed
-   * as well as unanalyzed exprs by ignoring implicit casts (see CastExpr.equals()).
+   * Returns true if this expr matches 'that'. Two exprs match if:
+   * 1. The tree structures ignoring implicit casts are the same.
+   * 2. For every pair of corresponding SlotRefs, slotRefCmp.matches() returns true.
+   * 3. For every pair of corresponding non-SlotRef exprs, localEquals() returns true.
    */
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) return false;
-    if (obj.getClass() != this.getClass()) return false;
-    // don't compare type, this could be called pre-analysis
-    Expr expr = (Expr) obj;
-    if (children_.size() != expr.children_.size()) return false;
+  public boolean matches(Expr that, SlotRef.Comparator slotRefCmp) {
+    if (that == null) return false;
+    if (this instanceof CastExpr && ((CastExpr)this).isImplicit()) {
+      return children_.get(0).matches(that, slotRefCmp);
+    }
+    if (that instanceof CastExpr && ((CastExpr)that).isImplicit()) {
+      return matches(((CastExpr) that).children_.get(0), slotRefCmp);
+    }
+    if (this instanceof SlotRef && that instanceof SlotRef) {
+      return slotRefCmp.matches((SlotRef)this, (SlotRef)that);
+    }
+    if (!localEquals(that)) return false;
+    if (children_.size() != that.children_.size()) return false;
     for (int i = 0; i < children_.size(); ++i) {
-      if (!children_.get(i).equals(expr.children_.get(i))) return false;
+      if (!children_.get(i).matches(that.children_.get(i), slotRefCmp)) return false;
     }
-    if (fn_ == null && expr.fn_ == null) return true;
-    if (fn_ == null || expr.fn_ == null) return false; // One null, one not
-    // Both fn_'s are not null
-    return fn_.equals(expr.fn_);
+    return true;
+  }
+
+  /**
+   * Local eq comparator. Returns true if this expr is equal to 'that' ignoring children.
+   */
+  protected boolean localEquals(Expr that) {
+    return getClass() == that.getClass() &&
+        (fn_ == null ? that.fn_ == null : fn_.equals(that.fn_));
+  }
+
+  /**
+   * Returns true if two expressions are equal. The equality comparison works on analyzed
+   * as well as unanalyzed exprs by ignoring implicit casts.
+   */
+  @Override
+  public final boolean equals(Object obj) {
+    return obj instanceof Expr && matches((Expr) obj, SlotRef.SLOTREF_EQ_CMP);
   }
 
   /**
@@ -738,7 +760,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   /**
-   * Return the intersection of l1 and l2.599
+   * Return the intersection of l1 and l2.
    */
   public static <C extends Expr> List<C> intersect(List<C> l1, List<C> l2) {
     List<C> result = new ArrayList<C>();
@@ -748,32 +770,6 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     return result;
   }
 
-  /**
-   * Compute the intersection of l1 and l2, given the smap, and
-   * return the intersecting l1 elements in i1 and the intersecting l2 elements in i2.
-   */
-  public static void intersect(Analyzer analyzer,
-      List<Expr> l1, List<Expr> l2, ExprSubstitutionMap smap,
-      List<Expr> i1, List<Expr> i2) {
-    i1.clear();
-    i2.clear();
-    List<Expr> s1List = Expr.substituteList(l1, smap, analyzer, false);
-    Preconditions.checkState(s1List.size() == l1.size());
-    List<Expr> s2List = Expr.substituteList(l2, smap, analyzer, false);
-    Preconditions.checkState(s2List.size() == l2.size());
-    for (int i = 0; i < s1List.size(); ++i) {
-      Expr s1 = s1List.get(i);
-      for (int j = 0; j < s2List.size(); ++j) {
-        Expr s2 = s2List.get(j);
-        if (s1.equals(s2)) {
-          i1.add(l1.get(i));
-          i2.add(l2.get(j));
-          break;
-        }
-      }
-    }
-  }
-
   @Override
   public int hashCode() {
     if (id_ == null) {
@@ -954,6 +950,25 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   }
 
   /**
+   * Return a new list without duplicate exprs (according to matches() using cmp).
+   */
+  public static <C extends Expr> List<C> removeDuplicates(List<C> l,
+      SlotRef.Comparator cmp) {
+    List<C> newList = Lists.newArrayList();
+    for (C expr: l) {
+      boolean exists = false;
+      for (C newExpr : newList) {
+        if (newExpr.matches(expr, cmp)) {
+          exists = true;
+          break;
+        }
+      }
+      if (!exists) newList.add(expr);
+    }
+    return newList;
+  }
+
+  /**
    * Removes constant exprs
    */
   public static <C extends Expr> void removeConstants(List<C> l) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 209cfbb..9574a02 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -179,13 +179,13 @@ public class FunctionCallExpr extends Expr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    FunctionCallExpr o = (FunctionCallExpr)obj;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    FunctionCallExpr o = (FunctionCallExpr)that;
     return fnName_.equals(o.fnName_) &&
-           params_.isDistinct() == o.params_.isDistinct() &&
-           params_.isIgnoreNulls() == o.params_.isIgnoreNulls() &&
-           params_.isStar() == o.params_.isStar();
+        params_.isDistinct() == o.params_.isDistinct() &&
+        params_.isIgnoreNulls() == o.params_.isIgnoreNulls() &&
+        params_.isStar() == o.params_.isStar();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
index 3f6bc68..df6779d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InlineViewRef.java
@@ -207,7 +207,7 @@ public class InlineViewRef extends TableRef {
       smap_.put(slotRef, colExpr);
       baseTblSmap_.put(slotRef, queryStmt_.getBaseTblResultExprs().get(i));
       if (createAuxPredicate(colExpr)) {
-        analyzer.createAuxEquivPredicate(new SlotRef(slotDesc), colExpr.clone());
+        analyzer.createAuxEqPredicate(new SlotRef(slotDesc), colExpr.clone());
       }
     }
     if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
index fe160c8..5a1fe99 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
@@ -78,9 +78,8 @@ public class IsNullPredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return ((IsNullPredicate) obj).isNotNull_ == isNotNull_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && ((IsNullPredicate) that).isNotNull_ == isNotNull_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
index 32ccddb..74d5fc6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
@@ -98,9 +98,8 @@ public class LikePredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return ((LikePredicate) obj).op_ == op_;
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && ((LikePredicate) that).op_ == op_;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java b/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
index e2ea869..ba35262 100644
--- a/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
@@ -48,12 +48,6 @@ public class NullLiteral extends LiteralExpr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return obj instanceof NullLiteral;
-  }
-
-  @Override
   public int hashCode() { return 0; }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
index 98b9dbd..80d2347 100644
--- a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
@@ -131,9 +131,8 @@ public class NumericLiteral extends LiteralExpr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return ((NumericLiteral) obj).value_.equals(value_);
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) && ((NumericLiteral) that).value_.equals(value_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index 69b9625..6f21a33 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -260,7 +260,7 @@ public abstract class QueryStmt extends StatementBase {
         analyzer.registerValueTransfer(
             inputSlotRef.getSlotId(), outputSlotRef.getSlotId());
       } else {
-        analyzer.createAuxEquivPredicate(outputSlotRef, inputSlotRef);
+        analyzer.createAuxEqPredicate(outputSlotRef, inputSlotRef);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index f90d7c6..f4b2144 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -29,8 +29,6 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.TExprNodeType;
 import org.apache.impala.thrift.TSlotRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -182,19 +180,30 @@ public class SlotRef extends Expr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    SlotRef other = (SlotRef) obj;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    SlotRef other = (SlotRef) that;
     // check slot ids first; if they're both set we only need to compare those
     // (regardless of how the ref was constructed)
     if (desc_ != null && other.desc_ != null) {
       return desc_.getId().equals(other.desc_.getId());
     }
-    if ((label_ == null) != (other.label_ == null)) return false;
-    if (!label_.equalsIgnoreCase(other.label_)) return false;
-    return true;
+    return label_ == null ? other.label_ == null : label_.equalsIgnoreCase(other.label_);
   }
 
+  /** Used for {@link Expr#matches(Expr, Comparator)} */
+  interface Comparator {
+    boolean matches(SlotRef a, SlotRef b);
+  }
+
+  /**
+   * A wrapper around localEquals() used for {@link #Expr#matches(Expr, Comparator)}.
+   */
+  static final Comparator SLOTREF_EQ_CMP = new Comparator() {
+    @Override
+    public boolean matches(SlotRef a, SlotRef b) { return a.localEquals(b); }
+  };
+
   @Override
   public boolean isBoundByTupleIds(List<TupleId> tids) {
     Preconditions.checkState(desc_ != null);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
index 1dbe5a4..72a5ca2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
@@ -60,9 +60,9 @@ public class StringLiteral extends LiteralExpr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    StringLiteral other = (StringLiteral) obj;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    StringLiteral other = (StringLiteral) that;
     return needsUnescaping_ == other.needsUnescaping_ && value_.equals(other.value_);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/Subquery.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Subquery.java b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
index bad3d7e..ca15020 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Subquery.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
@@ -154,9 +154,9 @@ public class Subquery extends Expr {
    * TODO: Switch to a less restrictive implementation.
    */
   @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o)) return false;
-    return stmt_.toSql().equals(((Subquery)o).stmt_.toSql());
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) &&
+        stmt_.toSql().equals(((Subquery)that).stmt_.toSql());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java b/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
index 25f111e..4a6f434 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
@@ -57,9 +57,9 @@ public class TimestampLiteral extends LiteralExpr {
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (!super.equals(obj)) return false;
-    return Arrays.equals(value_, ((TimestampLiteral) obj).value_);
+  public boolean localEquals(Expr that) {
+    return super.localEquals(that) &&
+        Arrays.equals(value_, ((TimestampLiteral) that).value_);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
index 8da0237..25d6c51 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
@@ -83,9 +83,9 @@ public class TupleIsNullPredicate extends Predicate {
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (!super.equals(o)) return false;
-    TupleIsNullPredicate other = (TupleIsNullPredicate) o;
+  public boolean localEquals(Expr that) {
+    if (!super.localEquals(that)) return false;
+    TupleIsNullPredicate other = (TupleIsNullPredicate) that;
     return other.tupleIds_.containsAll(tupleIds_) &&
         tupleIds_.containsAll(other.tupleIds_);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5e9b4e2f/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
index 41ff9d2..1140a7d 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java
@@ -121,9 +121,6 @@ public class AnalyticPlanner {
             i == 0 ? partitionGroup.partitionByExprs : null);
       }
     }
-
-    // create equiv classes for newly added slots
-    analyzer_.createIdentityEquivClasses();
     return root;
   }
 
@@ -197,8 +194,7 @@ public class AnalyticPlanner {
     for (PartitionGroup pg: partitionGroups) {
       List<Expr> l1 = Lists.newArrayList();
       List<Expr> l2 = Lists.newArrayList();
-      Expr.intersect(analyzer_, pg.partitionByExprs, groupingExprs,
-          analyzer_.getEquivClassSmap(), l1, l2);
+      analyzer_.exprIntersect(pg.partitionByExprs, groupingExprs, l1, l2);
       // TODO: also look at l2 and take the max?
       long ndv = Expr.getNumDistinctValues(l1);
       if (ndv < 0 || ndv < numNodes || ndv < maxNdv) continue;