You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ab...@apache.org on 2016/08/18 03:25:54 UTC

[2/2] incubator-impala git commit: IMPALA-3063: Separate join inversion from join ordering.

IMPALA-3063: Separate join inversion from join ordering.

Before this change joins were inverted while doing join ordering.
That approach was unnecessarily complex because it required
modifying the global analysis state for correct conjunct
placement, etc. However, join inversion is independent of join
ordering, and the existing approach could lead to generating
invalid plans with distributed non-equi right outer/semi joins,
which we cannot execute in the backend.

After this change joins are inverted in a separate pass over
the single-node plan. This simplifies the inversion
logic and allows us to avoid generating those invalid plans.

Note that this change is not only a separation of functionality
for the following reasons:
1. Our join cardinality estimation is not symmetric, i.e., A JOIN B
may not give the same estimate as B JOIN A due to our FK/PK detection
heuristic. In the context of this patch this means that an inverted
join may have a different cardinality estimate, so plans may change
depending on whether the inversion is done during join ordering of after.
2. We currently only invert outer/semi/anti joins based on the rhs table
ref join op. In this patch I want to preserve the existing behavior as
much as possible, but when doing the join ordering in a separate pass we
may see a join opn in a JoinNode that is different from the rhs table ref.
So in some situations the inversion behavior based on the join op could be
different and there are some examples in this patch.

This patch also moves the logic of converting hash joins to
nested-loop joins into a separate pass over the single-node plan.

Change-Id: If86db7753fc585bb4c69612745ec0103278888a4
Reviewed-on: http://gerrit.cloudera.org:8080/3846
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/532b1fe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/532b1fe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/532b1fe1

Branch: refs/heads/master
Commit: 532b1fe1186725b8e81fff93b59fc7cebf563c8b
Parents: dd906a8
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Jul 29 19:45:06 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Aug 18 03:25:16 2016 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.h                |   2 +
 .../com/cloudera/impala/analysis/Analyzer.java  |   5 +
 .../impala/analysis/BinaryPredicate.java        |  10 +
 .../cloudera/impala/analysis/SelectList.java    |  12 +-
 .../com/cloudera/impala/analysis/TableRef.java  |  29 ---
 .../impala/planner/AggregationNode.java         |   1 -
 .../impala/planner/AnalyticEvalNode.java        |  15 +-
 .../impala/planner/DistributedPlanner.java      |  24 +--
 .../cloudera/impala/planner/ExchangeNode.java   |  34 ++--
 .../cloudera/impala/planner/HashJoinNode.java   |  10 +-
 .../cloudera/impala/planner/HdfsScanNode.java   |   3 -
 .../com/cloudera/impala/planner/JoinNode.java   |  60 +++++-
 .../impala/planner/NestedLoopJoinNode.java      |  10 +-
 .../com/cloudera/impala/planner/PlanNode.java   |  37 ++--
 .../com/cloudera/impala/planner/Planner.java    | 118 +++++++++++-
 .../com/cloudera/impala/planner/SelectNode.java |  13 +-
 .../impala/planner/SingleNodePlanner.java       | 178 +++++-------------
 .../impala/planner/SingularRowSrcNode.java      |  13 +-
 .../com/cloudera/impala/planner/SortNode.java   |   2 +-
 .../cloudera/impala/planner/SubplanNode.java    |  16 +-
 .../com/cloudera/impala/planner/UnionNode.java  |   2 +-
 .../com/cloudera/impala/planner/UnnestNode.java |   3 +-
 .../queries/PlannerTest/implicit-joins.test     |  26 +--
 .../queries/PlannerTest/join-order.test         |  38 ++--
 .../queries/PlannerTest/joins.test              |  66 +++----
 .../queries/PlannerTest/nested-collections.test |  22 +--
 .../queries/PlannerTest/nested-loop-join.test   |  53 ++++++
 .../queries/PlannerTest/tpcds-all.test          | 182 +++++++++++++++++++
 .../queries/PlannerTest/tpch-nested.test        |  36 ++--
 29 files changed, 659 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index b3d25c1..14b1c72 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -38,6 +38,8 @@ class TupleRow;
 /// Abstract base class for join nodes that block while consuming all rows from their
 /// right child in Open(). There is no implementation of Reset() because the Open()
 /// sufficiently covers setting members into a 'reset' state.
+/// TODO: Remove the restriction that the tuples in the join's output row have to
+/// correspond to the order of its child exec nodes. See the DCHECKs in Init().
 class BlockingJoinNode : public ExecNode {
  public:
   BlockingJoinNode(const std::string& node_name, const TJoinOp::type join_op,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
index 5c9ac6f..a931489 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/Analyzer.java
@@ -121,6 +121,9 @@ public class Analyzer {
 
   private final User user_;
 
+  // Indicates whether this query block contains a straight join hint.
+  private boolean isStraightJoin_ = false;
+
   // Whether to use Hive's auto-generated column labels.
   private boolean useHiveColLabels_ = false;
 
@@ -2445,6 +2448,8 @@ public class Analyzer {
     enablePrivChecks_ = value;
   }
   public void setAuthErrMsg(String errMsg) { authErrorMsg_ = errMsg; }
+  public void setIsStraightJoin() { isStraightJoin_ = true; }
+  public boolean isStraightJoin() { return isStraightJoin_; }
   public void setIsExplain() { globalState_.isExplain = true; }
   public boolean isExplain() { return globalState_.isExplain; }
   public void setUseHiveColLabels(boolean useHiveColLabels) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java b/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java
index fe0590d..02b5c12 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/BinaryPredicate.java
@@ -18,6 +18,7 @@
 package com.cloudera.impala.analysis;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -355,6 +356,15 @@ public class BinaryPredicate extends Predicate {
     return new BinaryPredicate(newOp, getChild(0), getChild(1));
   }
 
+  /**
+   * Swaps the first with the second child in-place. Only valid to call for
+   * equivalence and not equal predicates.
+   */
+  public void reverse() {
+    Preconditions.checkState(op_.isEquivalence() || op_ == Operator.NE);
+    Collections.swap(children_, 0, 1);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (!super.equals(obj)) return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/analysis/SelectList.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/SelectList.java b/fe/src/main/java/com/cloudera/impala/analysis/SelectList.java
index fa34104..429d488 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/SelectList.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/SelectList.java
@@ -33,28 +33,22 @@ public class SelectList {
 
   private final List<SelectListItem> items_;
 
-  // Set in analyzePlanHints() based on planHints_.
-  private boolean isStraightJoin_;
-
   // END: Members that need to be reset()
   /////////////////////////////////////////
 
   public SelectList(List<SelectListItem> items) {
     isDistinct_ = false;
     items_ = items;
-    isStraightJoin_ = false;
   }
 
   public SelectList() {
     isDistinct_ = false;
     items_ = Lists.newArrayList();
-    isStraightJoin_ = false;
   }
 
   public SelectList(List<SelectListItem> items, boolean isDistinct,
       List<String> planHints) {
     isDistinct_ = isDistinct;
-    isStraightJoin_ = false;
     items_ = items;
     planHints_ = planHints;
   }
@@ -70,7 +64,6 @@ public class SelectList {
       items_.add(item.clone());
     }
     isDistinct_ = other.isDistinct_;
-    isStraightJoin_ = other.isStraightJoin_;
   }
 
   public List<SelectListItem> getItems() { return items_; }
@@ -78,7 +71,6 @@ public class SelectList {
   public List<String> getPlanHints() { return planHints_; }
   public boolean isDistinct() { return isDistinct_; }
   public void setIsDistinct(boolean value) { isDistinct_ = value; }
-  public boolean isStraightJoin() { return isStraightJoin_; }
   public boolean hasPlanHints() { return planHints_ != null; }
 
   public void analyzePlanHints(Analyzer analyzer) {
@@ -87,8 +79,7 @@ public class SelectList {
       if (!hint.equalsIgnoreCase("straight_join")) {
         analyzer.addWarning("PLAN hint not recognized: " + hint);
       }
-      isStraightJoin_ = true;
-      analyzer.setHasPlanHints();
+      analyzer.setIsStraightJoin();
     }
   }
 
@@ -99,6 +90,5 @@ public class SelectList {
     for (SelectListItem item: items_) {
       if (!item.isStar()) item.getExpr().reset();
     }
-    isStraightJoin_ = false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java b/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
index 8d4157d..0ff0575 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/TableRef.java
@@ -25,9 +25,7 @@ import java.util.Set;
 import com.cloudera.impala.catalog.HdfsTable;
 import com.cloudera.impala.catalog.Table;
 import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.Pair;
 import com.cloudera.impala.planner.JoinNode.DistributionMode;
-import com.cloudera.impala.planner.PlanNode;
 import com.cloudera.impala.thrift.TReplicaPreference;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -511,33 +509,6 @@ public class TableRef implements ParseNode {
     }
   }
 
-  /**
-   * Inverts the join whose rhs is represented by this table ref. If necessary, this
-   * function modifies the registered analysis state associated with this table ref,
-   * as well as the chain of left table references in refPlans as appropriate.
-   * Requires that this is the very first join in a series of joins.
-   */
-  public void invertJoin(List<Pair<TableRef, PlanNode>> refPlans, Analyzer analyzer) {
-    // Assert that this is the first join in a series of joins.
-    Preconditions.checkState(leftTblRef_.leftTblRef_ == null);
-    // Find a table ref that references 'this' as its left table (if any) and change
-    // it to reference 'this.leftTblRef_ 'instead, because 'this.leftTblRef_' will
-    // become the new rhs of the inverted join.
-    for (Pair<TableRef, PlanNode> refPlan: refPlans) {
-      if (refPlan.first.leftTblRef_ == this) {
-        refPlan.first.setLeftTblRef(leftTblRef_);
-        break;
-      }
-    }
-    if (joinOp_.isOuterJoin()) analyzer.invertOuterJoinState(this, leftTblRef_);
-    leftTblRef_.setJoinOp(getJoinOp().invert());
-    leftTblRef_.setLeftTblRef(this);
-    leftTblRef_.setOnClause(onClause_);
-    joinOp_ = null;
-    leftTblRef_ = null;
-    onClause_ = null;
-  }
-
   protected String tableRefToSql() {
     String aliasSql = null;
     String alias = getExplicitAlias();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
index feeb139..f6bf8a0 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/AggregationNode.java
@@ -71,7 +71,6 @@ public class AggregationNode extends PlanNode {
     super(id, aggInfo.getOutputTupleId().asList(), "AGGREGATE");
     aggInfo_ = aggInfo;
     children_.add(input);
-    nullableTupleIds_.addAll(input.getNullableTupleIds());
     needsFinalize_ = true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
index 82ade29..ccbdaa2 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/AnalyticEvalNode.java
@@ -74,10 +74,8 @@ public class AnalyticEvalNode extends PlanNode {
       AnalyticWindow analyticWindow, TupleDescriptor intermediateTupleDesc,
       TupleDescriptor outputTupleDesc, ExprSubstitutionMap logicalToPhysicalSmap,
       Expr partitionByEq, Expr orderByEq, TupleDescriptor bufferedTupleDesc) {
-    super(id, input.getTupleIds(), "ANALYTIC");
+    super(id, "ANALYTIC");
     Preconditions.checkState(!tupleIds_.contains(outputTupleDesc.getId()));
-    // we're materializing the input row augmented with the analytic output tuple
-    tupleIds_.add(outputTupleDesc.getId());
     analyticFnCalls_ = analyticFnCalls;
     partitionExprs_ = partitionExprs;
     orderByElements_ = orderByElements;
@@ -89,7 +87,16 @@ public class AnalyticEvalNode extends PlanNode {
     orderByEq_ = orderByEq;
     bufferedTupleDesc_ = bufferedTupleDesc;
     children_.add(input);
-    nullableTupleIds_.addAll(input.getNullableTupleIds());
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tupleIds_.addAll(getChild(0).getTupleIds());
+    // we're materializing the input row augmented with the analytic output tuple
+    tupleIds_.add(outputTupleDesc_.getId());
+    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
index b71fe3c..d546245 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
@@ -232,8 +232,8 @@ public class DistributedPlanner {
     }
 
     Preconditions.checkState(partitionHint == null || partitionHint);
-    ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchNode.addChild(inputFragment.getPlanRoot());
+    ExchangeNode exchNode =
+        new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
     exchNode.init(analyzer);
     Preconditions.checkState(exchNode.hasValidStats());
     DataPartition partition = DataPartition.hashPartitioned(nonConstPartitionExprs);
@@ -253,8 +253,8 @@ public class DistributedPlanner {
   private PlanFragment createMergeFragment(PlanFragment inputFragment)
       throws ImpalaException {
     Preconditions.checkState(inputFragment.isPartitioned());
-    ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId());
-    mergePlan.addChild(inputFragment.getPlanRoot());
+    ExchangeNode mergePlan =
+        new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
     mergePlan.init(ctx_.getRootAnalyzer());
     Preconditions.checkState(mergePlan.hasValidStats());
     PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
@@ -381,12 +381,12 @@ public class DistributedPlanner {
     // left- and rightChildFragments, which now partition their output
     // on their respective join exprs.
     // The new fragment is hash-partitioned on the lhs input join exprs.
-    ExchangeNode lhsExchange = new ExchangeNode(ctx_.getNextNodeId());
-    lhsExchange.addChild(leftChildFragment.getPlanRoot());
+    ExchangeNode lhsExchange =
+        new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot());
     lhsExchange.computeStats(null);
     node.setChild(0, lhsExchange);
-    ExchangeNode rhsExchange = new ExchangeNode(ctx_.getNextNodeId());
-    rhsExchange.addChild(rightChildFragment.getPlanRoot());
+    ExchangeNode rhsExchange =
+        new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot());
     rhsExchange.computeStats(null);
     node.setChild(1, rhsExchange);
 
@@ -694,8 +694,8 @@ public class DistributedPlanner {
    */
   private void connectChildFragment(PlanNode node, int childIdx,
       PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException {
-    ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchangeNode.addChild(childFragment.getPlanRoot());
+    ExchangeNode exchangeNode =
+        new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
     exchangeNode.init(ctx_.getRootAnalyzer());
     exchangeNode.setFragment(parentFragment);
     node.setChild(childIdx, exchangeNode);
@@ -715,8 +715,8 @@ public class DistributedPlanner {
   private PlanFragment createParentFragment(
       PlanFragment childFragment, DataPartition parentPartition)
       throws ImpalaException {
-    ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchangeNode.addChild(childFragment.getPlanRoot());
+    ExchangeNode exchangeNode =
+        new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
     exchangeNode.init(ctx_.getRootAnalyzer());
     PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
         exchangeNode, parentPartition);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
index 2593935..eeef5fe 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
@@ -31,8 +31,6 @@ import com.cloudera.impala.thrift.TPlanNode;
 import com.cloudera.impala.thrift.TPlanNodeType;
 import com.cloudera.impala.thrift.TSortInfo;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  * Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data
@@ -63,9 +61,21 @@ public class ExchangeNode extends PlanNode {
   // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
   private long offset_;
 
-  public ExchangeNode(PlanNodeId id) {
+  public ExchangeNode(PlanNodeId id, PlanNode input) {
     super(id, "EXCHANGE");
     offset_ = 0;
+    children_.add(input);
+    // Only apply the limit at the receiver if there are multiple senders.
+    if (input.getFragment().isPartitioned()) limit_ = input.limit_;
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tupleIds_.addAll(getChild(0).getTupleIds());
+    tblRefIds_.addAll(getChild(0).getTblRefIds());
+    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
   }
 
   @Override
@@ -74,22 +84,6 @@ public class ExchangeNode extends PlanNode {
     Preconditions.checkState(conjuncts_.isEmpty());
   }
 
-  public void addChild(PlanNode node) {
-    // This ExchangeNode 'inherits' several parameters from its children.
-    // Ensure that all children agree on them.
-    if (!children_.isEmpty()) {
-      Preconditions.checkState(limit_ == node.limit_);
-      Preconditions.checkState(tupleIds_.equals(node.tupleIds_));
-      Preconditions.checkState(nullableTupleIds_.equals(node.nullableTupleIds_));
-    } else {
-      // Only apply the limit at the receiver if there are multiple senders.
-      if (node.getFragment().isPartitioned()) limit_ = node.limit_;
-      tupleIds_ = Lists.newArrayList(node.tupleIds_);
-      nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_);
-    }
-    children_.add(node);
-  }
-
   @Override
   public void computeStats(Analyzer analyzer) {
     Preconditions.checkState(!children_.isEmpty(),
@@ -193,8 +187,6 @@ public class ExchangeNode extends PlanNode {
 
   @Override
   protected void toThrift(TPlanNode msg) {
-    Preconditions.checkState(!children_.isEmpty(),
-        "ExchangeNode must have at least one child");
     msg.node_type = TPlanNodeType.EXCHANGE_NODE;
     msg.exchange_node = new TExchangeNode();
     for (TupleId tid: tupleIds_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
index 1a69c59..906f732 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
@@ -49,15 +49,16 @@ import com.google.common.collect.Lists;
 public class HashJoinNode extends JoinNode {
   private final static Logger LOG = LoggerFactory.getLogger(HashJoinNode.class);
 
-  public HashJoinNode(
-      PlanNode outer, PlanNode inner, DistributionMode distrMode, JoinOperator joinOp,
+  public HashJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
+      DistributionMode distrMode, JoinOperator joinOp,
       List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
-    super(outer, inner, distrMode, joinOp, eqJoinConjuncts, otherJoinConjuncts,
-        "HASH JOIN");
+    super(outer, inner, isStraightJoin, distrMode, joinOp, eqJoinConjuncts,
+        otherJoinConjuncts, "HASH JOIN");
     Preconditions.checkNotNull(eqJoinConjuncts);
     Preconditions.checkState(joinOp_ != JoinOperator.CROSS_JOIN);
   }
 
+  @Override
   public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
 
   @Override
@@ -103,6 +104,7 @@ public class HashJoinNode extends JoinNode {
     }
     eqJoinConjuncts_ = newEqJoinConjuncts;
     orderJoinConjunctsByCost();
+    computeStats(analyzer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
index b8337fd..31ec600 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import com.cloudera.impala.analysis.Analyzer;
 import com.cloudera.impala.analysis.Expr;
 import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotId;
 import com.cloudera.impala.analysis.TableRef;
 import com.cloudera.impala.analysis.TupleDescriptor;
 import com.cloudera.impala.analysis.TupleId;
@@ -42,7 +41,6 @@ import com.cloudera.impala.common.ImpalaException;
 import com.cloudera.impala.common.NotImplementedException;
 import com.cloudera.impala.common.PrintUtils;
 import com.cloudera.impala.common.RuntimeEnv;
-import com.cloudera.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
 import com.cloudera.impala.thrift.TExplainLevel;
 import com.cloudera.impala.thrift.TExpr;
 import com.cloudera.impala.thrift.THdfsFileBlock;
@@ -391,7 +389,6 @@ public class HdfsScanNode extends ScanNode {
         totalFiles_ += p.getFileDescriptors().size();
         totalBytes_ += p.getSize();
       }
-
       if (!partitions_.isEmpty() && !hasValidPartitionCardinality) {
         // if none of the partitions knew its number of rows, we fall back on
         // the table stats

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
index 98f2e57..ebc9b51 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
@@ -17,6 +17,7 @@
 
 package com.cloudera.impala.planner;
 
+import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -50,6 +51,9 @@ public abstract class JoinNode extends PlanNode {
 
   protected JoinOperator joinOp_;
 
+  // Indicates if this join originates from a query block with a straight join hint.
+  protected final boolean isStraightJoin_;
+
   // User-provided hint for the distribution mode. Set to 'NONE' if no hints were given.
   protected final DistributionMode distrModeHint_;
 
@@ -81,13 +85,28 @@ public abstract class JoinNode extends PlanNode {
     public String toString() { return description_; }
   }
 
-  public JoinNode(PlanNode outer, PlanNode inner, DistributionMode distrMode,
-      JoinOperator joinOp, List<BinaryPredicate> eqJoinConjuncts,
-      List<Expr> otherJoinConjuncts, String displayName) {
+  public JoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
+      DistributionMode distrMode, JoinOperator joinOp,
+      List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts,
+      String displayName) {
     super(displayName);
     Preconditions.checkNotNull(otherJoinConjuncts);
-    joinOp_ = joinOp;
+    isStraightJoin_ = isStraightJoin;
     distrModeHint_ = distrMode;
+    joinOp_ = joinOp;
+    children_.add(outer);
+    children_.add(inner);
+    eqJoinConjuncts_ = eqJoinConjuncts;
+    otherJoinConjuncts_ = otherJoinConjuncts;
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    Preconditions.checkState(children_.size() == 2);
+    clearTupleIds();
+    PlanNode outer = children_.get(0);
+    PlanNode inner = children_.get(1);
 
     // Only retain the non-semi-joined tuples of the inputs.
     switch (joinOp_) {
@@ -111,11 +130,6 @@ public abstract class JoinNode extends PlanNode {
     tblRefIds_.addAll(outer.getTblRefIds());
     tblRefIds_.addAll(inner.getTblRefIds());
 
-    otherJoinConjuncts_ = otherJoinConjuncts;
-    eqJoinConjuncts_ = eqJoinConjuncts;
-    children_.add(outer);
-    children_.add(inner);
-
     // Inherits all the nullable tuple from the children
     // Mark tuples that form the "nullable" side of the outer join as nullable.
     nullableTupleIds_.addAll(inner.getNullableTupleIds());
@@ -133,6 +147,7 @@ public abstract class JoinNode extends PlanNode {
   public JoinOperator getJoinOp() { return joinOp_; }
   public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
   public List<Expr> getOtherJoinConjuncts() { return otherJoinConjuncts_; }
+  public boolean isStraightJoin() { return isStraightJoin_; }
   public DistributionMode getDistributionModeHint() { return distrModeHint_; }
   public DistributionMode getDistributionMode() { return distrMode_; }
   public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; }
@@ -141,7 +156,10 @@ public abstract class JoinNode extends PlanNode {
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {
-    super.init(analyzer);
+    // Do not call super.init() to defer computeStats() until all conjuncts
+    // have been collected.
+    assignConjuncts(analyzer);
+    createDefaultSmap(analyzer);
     assignedConjuncts_ = analyzer.getAssignedConjuncts();
     otherJoinConjuncts_ = Expr.substituteList(otherJoinConjuncts_,
         getCombinedChildSmap(), analyzer, false);
@@ -388,6 +406,13 @@ public abstract class JoinNode extends PlanNode {
       cardinality_ = getSemiJoinCardinality();
     } else if (joinOp_.isInnerJoin() || joinOp_.isOuterJoin()){
       cardinality_ = getJoinCardinality(analyzer);
+    } else {
+      Preconditions.checkState(joinOp_.isCrossJoin());
+      long leftCard = getChild(0).cardinality_;
+      long rightCard = getChild(1).cardinality_;
+      if (leftCard != -1 && rightCard != -1) {
+        cardinality_ = multiplyCardinalities(leftCard, rightCard);
+      }
     }
 
     // Impose lower/upper bounds on the cardinality based on the join type.
@@ -453,6 +478,21 @@ public abstract class JoinNode extends PlanNode {
     LOG.debug("stats Join: cardinality=" + Long.toString(cardinality_));
   }
 
+  /**
+   * Inverts the join op, swaps our children, and swaps the children
+   * of all eqJoinConjuncts_. All modifications are in place.
+   */
+  public void invertJoin() {
+    joinOp_ = joinOp_.invert();
+    Collections.swap(children_, 0, 1);
+    for (BinaryPredicate p: eqJoinConjuncts_) p.reverse();
+  }
+
+  public boolean hasConjuncts() {
+    return !eqJoinConjuncts_.isEmpty() || !otherJoinConjuncts_.isEmpty() ||
+        !conjuncts_.isEmpty();
+  }
+
   @Override
   protected String getDisplayLabelDetail() {
     StringBuilder output = new StringBuilder(joinOp_.toString());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
index 7a0c4f6..e989438 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
@@ -52,10 +52,11 @@ import com.google.common.base.Preconditions;
 public class NestedLoopJoinNode extends JoinNode {
   private final static Logger LOG = LoggerFactory.getLogger(NestedLoopJoinNode.class);
 
-  public NestedLoopJoinNode(PlanNode outer, PlanNode inner, DistributionMode distrMode,
-      JoinOperator joinOp, List<Expr> otherJoinConjuncts) {
-    super(outer, inner, distrMode, joinOp, Collections.<BinaryPredicate>emptyList(),
-        otherJoinConjuncts, "NESTED LOOP JOIN");
+  public NestedLoopJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
+      DistributionMode distrMode, JoinOperator joinOp, List<Expr> otherJoinConjuncts) {
+    super(outer, inner, isStraightJoin, distrMode, joinOp,
+        Collections.<BinaryPredicate>emptyList(), otherJoinConjuncts,
+        "NESTED LOOP JOIN");
   }
 
   @Override
@@ -71,6 +72,7 @@ public class NestedLoopJoinNode extends JoinNode {
       joinOp_ = JoinOperator.INNER_JOIN;
     }
     orderJoinConjunctsByCost();
+    computeStats(analyzer);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
index 3eae642..d38f10a 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
@@ -123,27 +123,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   // Runtime filters assigned to this node.
   protected List<RuntimeFilter> runtimeFilters_ = Lists.newArrayList();
 
-  protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String displayName) {
-    id_ = id;
-    limit_ = -1;
-    // make a copy, just to be on the safe side
-    tupleIds_ = Lists.newArrayList(tupleIds);
-    tblRefIds_ = Lists.newArrayList(tupleIds);
-    cardinality_ = -1;
-    numNodes_ = -1;
-    displayName_ = displayName;
+  protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) {
+    this(id, displayName);
+    tupleIds_.addAll(tupleIds);
+    tblRefIds_.addAll(tupleIds);
   }
 
   /**
    * Deferred id_ assignment.
    */
   protected PlanNode(String displayName) {
-    limit_ = -1;
-    tupleIds_ = Lists.newArrayList();
-    tblRefIds_ = Lists.newArrayList();
-    cardinality_ = -1;
-    numNodes_ = -1;
-    displayName_ = displayName;
+    this(null, displayName);
   }
 
   protected PlanNode(PlanNodeId id, String displayName) {
@@ -171,6 +161,23 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
     displayName_ = displayName;
   }
 
+  /**
+   * Sets tblRefIds_, tupleIds_, and nullableTupleIds_.
+   * The default implementation is a no-op.
+   */
+  public void computeTupleIds() {
+    Preconditions.checkState(children_.isEmpty() || !tupleIds_.isEmpty());
+  }
+
+  /**
+   * Clears tblRefIds_, tupleIds_, and nullableTupleIds_.
+   */
+  protected void clearTupleIds() {
+    tblRefIds_.clear();
+    tupleIds_.clear();
+    nullableTupleIds_.clear();
+  }
+
   public PlanNodeId getId() { return id_; }
   public void setId(PlanNodeId id) {
     Preconditions.checkState(id_ == null);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
index f639c59..4d1087b 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
@@ -4,24 +4,26 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.QueryStmt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.impala.analysis.AnalysisContext;
+import com.cloudera.impala.analysis.Analyzer;
 import com.cloudera.impala.analysis.ColumnLineageGraph;
 import com.cloudera.impala.analysis.Expr;
+import com.cloudera.impala.analysis.ExprSubstitutionMap;
 import com.cloudera.impala.analysis.InsertStmt;
+import com.cloudera.impala.analysis.JoinOperator;
+import com.cloudera.impala.analysis.QueryStmt;
 import com.cloudera.impala.catalog.HBaseTable;
 import com.cloudera.impala.catalog.Table;
 import com.cloudera.impala.common.ImpalaException;
 import com.cloudera.impala.common.PrintUtils;
 import com.cloudera.impala.common.RuntimeEnv;
 import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TRuntimeFilterMode;
 import com.cloudera.impala.thrift.TQueryCtx;
 import com.cloudera.impala.thrift.TQueryExecRequest;
+import com.cloudera.impala.thrift.TRuntimeFilterMode;
 import com.cloudera.impala.thrift.TTableName;
 import com.cloudera.impala.util.MaxRowsProcessedVisitor;
 import com.google.common.base.Joiner;
@@ -76,8 +78,16 @@ public class Planner {
         // Only one scanner thread for small queries
         ctx_.getQueryOptions().setNum_scanner_threads(1);
       }
-    } else if (
-      ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
+      // disable runtime filters
+      ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
+    }
+
+    // Join rewrites.
+    invertJoins(singleNodePlan, ctx_.isSingleNodeExec());
+    singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer());
+
+    // create runtime filters
+    if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
       // Always compute filters, even if the BE won't always use all of them.
       RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(),
           singleNodePlan, ctx_.getQueryOptions().getMax_num_runtime_filters());
@@ -328,4 +338,102 @@ public class Planner {
     LOG.debug("Estimated per-host peak memory requirement: " + maxPerHostMem);
     LOG.debug("Estimated per-host virtual cores requirement: " + maxPerHostVcores);
   }
+
+  /**
+   * Traverses the plan tree rooted at 'root' and inverts outer and semi joins
+   * in the following situations:
+   * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because
+   *    then the build side is guaranteed to have only a single row.
+   * 2. There is no backend support for distributed non-equi right outer/semi joins,
+   *    so we invert them (any distributed left semi/outer join is ok).
+   * 3. Invert semi/outer joins if the right-hand size is estimated to have a higher
+   *    cardinality*avgSerializedSize. Do not invert if relevant stats are missing.
+   * The first two inversion rules are independent of the presence/absence of stats.
+   * Left Null Aware Anti Joins are never inverted due to lack of backend support.
+   * Joins that originate from query blocks with a straight join hint are not inverted.
+   * The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root'
+   * will be executed locally within one machine, i.e., without any data exchanges.
+   */
+  private void invertJoins(PlanNode root, boolean isLocalPlan) {
+    if (root instanceof SubplanNode) {
+      invertJoins(root.getChild(0), isLocalPlan);
+      invertJoins(root.getChild(1), true);
+    } else {
+      for (PlanNode child: root.getChildren()) invertJoins(child, isLocalPlan);
+    }
+
+    if (root instanceof JoinNode) {
+      JoinNode joinNode = (JoinNode) root;
+      JoinOperator joinOp = joinNode.getJoinOp();
+
+      // 1. No inversion allowed due to straight join.
+      // 2. The null-aware left anti-join operator is not considered for inversion.
+      //    There is no backend support for a null-aware right anti-join because
+      //    we cannot execute it efficiently.
+      if (joinNode.isStraightJoin() || joinOp.isNullAwareLeftAntiJoin()) {
+        // Re-compute tuple ids since their order must correspond to the order of children.
+        root.computeTupleIds();
+        return;
+      }
+
+      if (joinNode.getChild(0) instanceof SingularRowSrcNode) {
+        // Always place a singular row src on the build side because it
+        // only produces a single row.
+        joinNode.invertJoin();
+      } else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode &&
+          (joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) {
+        // The current join is a distributed non-equi right outer or semi join
+        // which has no backend support. Invert the join to make it executable.
+        joinNode.invertJoin();
+      } else {
+        // Invert the join if doing so reduces the size of the materialized rhs
+        // (may also reduce network costs depending on the join strategy).
+        // Only consider this optimization if both the lhs/rhs cardinalities are known.
+        long lhsCard = joinNode.getChild(0).getCardinality();
+        long rhsCard = joinNode.getChild(1).getCardinality();
+        float lhsAvgRowSize = joinNode.getChild(0).getAvgRowSize();
+        float rhsAvgRowSize = joinNode.getChild(1).getAvgRowSize();
+        if (lhsCard != -1 && rhsCard != -1 &&
+            lhsCard * lhsAvgRowSize < rhsCard * rhsAvgRowSize &&
+            // TODO: Do not invert inner joins. Relax this restriction.
+            !(joinOp.isInnerJoin() && joinNode.hasConjuncts())) {
+          joinNode.invertJoin();
+        }
+      }
+    }
+
+    // Re-compute tuple ids because the backend assumes that their order corresponds to
+    // the order of children.
+    root.computeTupleIds();
+  }
+
+  /**
+   * Converts hash joins to nested-loop joins if the right-side is a SingularRowSrcNode.
+   * Does not convert Null Aware Anti Joins because we only support that join op with
+   * a hash join.
+   * Throws if JoinNode.init() fails on the new nested-loop join node.
+   */
+  private PlanNode useNljForSingularRowBuilds(PlanNode root, Analyzer analyzer)
+      throws ImpalaException {
+    for (int i = 0; i < root.getChildren().size(); ++i) {
+      root.setChild(i, useNljForSingularRowBuilds(root.getChild(i), analyzer));
+    }
+    if (!(root instanceof JoinNode)) return root;
+    if (root instanceof NestedLoopJoinNode) return root;
+    if (!(root.getChild(1) instanceof SingularRowSrcNode)) return root;
+    JoinNode joinNode = (JoinNode) root;
+    if (joinNode.getJoinOp().isNullAwareLeftAntiJoin()) {
+      Preconditions.checkState(joinNode instanceof HashJoinNode);
+      return root;
+    }
+    List<Expr> otherJoinConjuncts = Lists.newArrayList(joinNode.getOtherJoinConjuncts());
+    otherJoinConjuncts.addAll(joinNode.getEqJoinConjuncts());
+    JoinNode newJoinNode = new NestedLoopJoinNode(joinNode.getChild(0), joinNode.getChild(1),
+        joinNode.isStraightJoin(), joinNode.getDistributionModeHint(),
+        joinNode.getJoinOp(), otherJoinConjuncts);
+    newJoinNode.getConjuncts().addAll(joinNode.getConjuncts());
+    newJoinNode.setId(joinNode.getId());
+    newJoinNode.init(analyzer);
+    return newJoinNode;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java b/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java
index 44355be..b418224 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SelectNode.java
@@ -36,11 +36,18 @@ public class SelectNode extends PlanNode {
   private final static Logger LOG = LoggerFactory.getLogger(SelectNode.class);
 
   protected SelectNode(PlanNodeId id, PlanNode child, List<Expr> conjuncts) {
-    super(id, child.getTupleIds(), "SELECT");
+    super(id, "SELECT");
     addChild(child);
-    this.tblRefIds_ = child.tblRefIds_;
-    this.nullableTupleIds_ = child.nullableTupleIds_;
     conjuncts_.addAll(conjuncts);
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tblRefIds_.addAll(getChild(0).getTblRefIds());
+    tupleIds_.addAll(getChild(0).getTupleIds());
+    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
index 71794ae..2212d35 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java
@@ -150,9 +150,7 @@ public class SingleNodePlanner {
   /**
    * Validates a single-node plan by checking that it does not contain right or
    * full outer joins with no equi-join conjuncts that are not inside the right child
-   * of a SubplanNode. Throws an InternalException if plan validation fails.
-   *
-   * TODO for 2.3: Temporary solution; the planner should avoid generating invalid plans.
+   * of a SubplanNode. Throws a NotImplementedException if plan validation fails.
    */
   public void validatePlan(PlanNode planNode) throws NotImplementedException {
     if (planNode instanceof NestedLoopJoinNode) {
@@ -343,18 +341,10 @@ public class SingleNodePlanner {
       TableRef ref = entry.first;
       JoinOperator joinOp = ref.getJoinOp();
 
-      // The rhs table of an outer/semi join can appear as the left-most input if we
-      // invert the lhs/rhs and the join op. However, we may only consider this inversion
-      // for the very first join in refPlans, otherwise we could reorder tables/joins
-      // across outer/semi joins which is generally incorrect. The null-aware
-      // left anti-join operator is never considered for inversion because we can't
-      // execute the null-aware right anti-join efficiently.
+      // Avoid reordering outer/semi joins which is generally incorrect.
       // TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
       // consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
-      if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
-          ref != parentRefPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
-        continue;
-      }
+      if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) continue;
 
       PlanNode plan = entry.second;
       if (plan.getCardinality() == -1) {
@@ -395,7 +385,6 @@ public class SingleNodePlanner {
    * are in decreasing order of selectiveness (percentage of rows they eliminate).
    * Creates and adds subplan nodes as soon as the tuple ids required by at least one
    * subplan ref are materialized by a join node added during plan generation.
-   * The leftmostRef's join will be inverted if it is an outer/semi/cross join.
    */
   private PlanNode createJoinPlan(Analyzer analyzer, TableRef leftmostRef,
       List<Pair<TableRef, PlanNode>> refPlans, List<SubplanRef> subplanRefs)
@@ -414,22 +403,6 @@ public class SingleNodePlanner {
     }
     Preconditions.checkNotNull(root);
 
-    // If the leftmostTblRef is an outer/semi/cross join, we must invert it.
-    boolean planHasInvertedJoin = false;
-    if (leftmostRef.getJoinOp().isOuterJoin()
-        || leftmostRef.getJoinOp().isSemiJoin()
-        || leftmostRef.getJoinOp().isCrossJoin()) {
-      // TODO: Revisit the interaction of join inversion here and the analysis state
-      // that is changed in analyzer.invertOuterJoin(). Changing the analysis state
-      // should not be necessary because the semantics of an inverted outer join do
-      // not change.
-      leftmostRef.invertJoin(refPlans, analyzer);
-      planHasInvertedJoin = true;
-      // Avoid swapping the refPlan elements in-place.
-      refPlans = Lists.newArrayList(refPlans);
-      Collections.swap(refPlans, 0, 1);
-    }
-
     // Maps from a TableRef in refPlans with an outer/semi join op to the set of
     // TableRefs that precede it refPlans (i.e., in FROM-clause order).
     // The map is used to place outer/semi joins at a fixed position in the plan tree
@@ -470,37 +443,8 @@ public class SingleNodePlanner {
           if (!requiredRefs.equals(joinedRefs)) break;
         }
 
-        PlanNode rhsPlan = entry.second;
-        boolean invertJoin = false;
-        if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
-          // Invert the join if doing so reduces the size of build-side hash table
-          // (may also reduce network costs depending on the join strategy).
-          // Only consider this optimization if both the lhs/rhs cardinalities are known.
-          // The null-aware left anti-join operator is never considered for inversion
-          // because we can't execute the null-aware right anti-join efficiently.
-          long lhsCard = root.getCardinality();
-          long rhsCard = rhsPlan.getCardinality();
-          if (lhsCard != -1 && rhsCard != -1 &&
-              lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
-              !joinOp.isNullAwareLeftAntiJoin()) {
-            invertJoin = true;
-          }
-        }
-        // Always place singular row src nodes on the build side, unless we need a
-        // null-aware left anti join which cannot be inverted.
-        if (root instanceof SingularRowSrcNode && !joinOp.isNullAwareLeftAntiJoin()) {
-          invertJoin = true;
-        }
-
         analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
-        PlanNode candidate = null;
-        if (invertJoin) {
-          ref.setJoinOp(ref.getJoinOp().invert());
-          candidate = createJoinNode(analyzer, rhsPlan, root, ref, null);
-          planHasInvertedJoin = true;
-        } else {
-          candidate = createJoinNode(analyzer, root, rhsPlan, null, ref);
-        }
+        PlanNode candidate = createJoinNode(root, entry.second, ref, analyzer);
         if (candidate == null) continue;
         LOG.trace("cardinality=" + Long.toString(candidate.getCardinality()));
 
@@ -524,14 +468,7 @@ public class SingleNodePlanner {
         }
       }
       if (newRoot == null) {
-        // Currently, it should not be possible to invert a join for a plan that turns
-        // out to be non-executable because (1) the joins we consider for inversion are
-        // barriers in the join order, and (2) the caller of this function only considers
-        // other leftmost table refs if a plan turns out to be non-executable.
-        // TODO: This preconditions check will need to be changed to undo the in-place
-        // modifications made to table refs for join inversion, if the caller decides to
-        // explore more leftmost table refs.
-        Preconditions.checkState(!planHasInvertedJoin);
+        // Could not generate a valid plan.
         return null;
       }
 
@@ -552,7 +489,7 @@ public class SingleNodePlanner {
       // TODO: Once we have stats on nested collections, we should consider the join
       // order in conjunction with the placement of SubplanNodes, i.e., move the creation
       // of SubplanNodes into the join-ordering loop above.
-      root = createSubplan(root, subplanRefs, false, false, analyzer);
+      root = createSubplan(root, subplanRefs, false, analyzer);
       // assign node ids after running through the possible choices in order to end up
       // with a dense sequence of node ids
       if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId());
@@ -577,8 +514,8 @@ public class SingleNodePlanner {
     for (int i = 1; i < parentRefPlans.size(); ++i) {
       TableRef innerRef = parentRefPlans.get(i).first;
       PlanNode innerPlan = parentRefPlans.get(i).second;
-      root = createJoinNode(analyzer, root, innerPlan, null, innerRef);
-      if (root != null) root = createSubplan(root, subplanRefs, true, false, analyzer);
+      root = createJoinNode(root, innerPlan, innerRef, analyzer);
+      if (root != null) root = createSubplan(root, subplanRefs, false, analyzer);
       if (root instanceof SubplanNode) root.getChild(0).setId(ctx_.getNextNodeId());
       root.setId(ctx_.getNextNodeId());
     }
@@ -641,13 +578,12 @@ public class SingleNodePlanner {
 
     // Separate table refs into parent refs (uncorrelated or absolute) and
     // subplan refs (correlated or relative), and generate their plan.
-    boolean isStraightJoin = selectStmt.getSelectList().isStraightJoin();
     List<TableRef> parentRefs = Lists.newArrayList();
     List<SubplanRef> subplanRefs = Lists.newArrayList();
     computeParentAndSubplanRefs(
-        selectStmt.getTableRefs(), isStraightJoin, parentRefs, subplanRefs);
-    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, isStraightJoin,
-        fastPartitionKeyScans, analyzer);
+        selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs);
+    PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, fastPartitionKeyScans,
+        analyzer);
     // add aggregation, if any
     if (aggInfo != null) root = createAggregationPlan(selectStmt, analyzer, root);
 
@@ -799,17 +735,16 @@ public class SingleNodePlanner {
    * metadata instead of table scans.
    */
   private PlanNode createTableRefsPlan(List<TableRef> parentRefs,
-      List<SubplanRef> subplanRefs, boolean isStraightJoin,
-      boolean fastPartitionKeyScans, Analyzer analyzer) throws ImpalaException {
+      List<SubplanRef> subplanRefs, boolean fastPartitionKeyScans,
+      Analyzer analyzer) throws ImpalaException {
     // create plans for our table refs; use a list here instead of a map to
     // maintain a deterministic order of traversing the TableRefs during join
     // plan generation (helps with tests)
     List<Pair<TableRef, PlanNode>> parentRefPlans = Lists.newArrayList();
     for (TableRef ref: parentRefs) {
-      PlanNode root =
-          createTableRefNode(ref, isStraightJoin, fastPartitionKeyScans, analyzer);
+      PlanNode root = createTableRefNode(ref, fastPartitionKeyScans, analyzer);
       Preconditions.checkNotNull(root);
-      root = createSubplan(root, subplanRefs, isStraightJoin, true, analyzer);
+      root = createSubplan(root, subplanRefs, true, analyzer);
       parentRefPlans.add(new Pair<TableRef, PlanNode>(ref, root));
     }
     // save state of conjunct assignment; needed for join plan generation
@@ -818,7 +753,7 @@ public class SingleNodePlanner {
     }
 
     PlanNode root = null;
-    if (!isStraightJoin) {
+    if (!analyzer.isStraightJoin()) {
       Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
       root = createCheapestJoinPlan(analyzer, parentRefPlans, subplanRefs);
       // If createCheapestJoinPlan() failed to produce an executable plan, then we need
@@ -826,7 +761,7 @@ public class SingleNodePlanner {
       // to not incorrectly miss conjuncts.
       if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
     }
-    if (isStraightJoin || root == null) {
+    if (analyzer.isStraightJoin() || root == null) {
       // we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
       // keyword was in the select list: use the FROM clause order instead
       root = createFromClauseJoinPlan(analyzer, parentRefPlans, subplanRefs);
@@ -856,8 +791,7 @@ public class SingleNodePlanner {
    *   the SubplanNode generated here
    */
   private PlanNode createSubplan(PlanNode root, List<SubplanRef> subplanRefs,
-      boolean isStraightJoin, boolean assignId, Analyzer analyzer)
-      throws ImpalaException {
+      boolean assignId, Analyzer analyzer) throws ImpalaException {
     Preconditions.checkNotNull(root);
     List<TableRef> applicableRefs = extractApplicableRefs(root, subplanRefs);
     if (applicableRefs.isEmpty()) return root;
@@ -878,8 +812,7 @@ public class SingleNodePlanner {
     // their containing SubplanNode. Also, further plan generation relies on knowing
     // whether we are in a subplan context or not (see computeParentAndSubplanRefs()).
     ctx_.pushSubplan(subplanNode);
-    PlanNode subplan =
-        createTableRefsPlan(applicableRefs, subplanRefs, isStraightJoin, false, analyzer);
+    PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, false, analyzer);
     ctx_.popSubplan();
     subplanNode.setSubplan(subplan);
     subplanNode.init(analyzer);
@@ -1445,55 +1378,31 @@ public class SingleNodePlanner {
   }
 
   /**
-   * Create a node to join outer with inner. Either the outer or the inner may be a plan
-   * created from a table ref (but not both), and the corresponding outer/innerRef
-   * should be non-null.
-   * Throws if the JoinNode.init() failed, or the required physical join implementation
-   * is missing, e.g., an outer/semi join without equi conjuncts.
+   * Creates a new node to join outer with inner. Collects and assigns join conjunct
+   * as well as regular conjuncts. Calls init() on the new join node.
+   * Throws if the JoinNode.init() fails.
    */
-  private PlanNode createJoinNode(
-      Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
-      TableRef innerRef) throws ImpalaException {
-    Preconditions.checkState(innerRef != null ^ outerRef != null);
-    TableRef tblRef = (innerRef != null) ? innerRef : outerRef;
-
+  private PlanNode createJoinNode(PlanNode outer, PlanNode inner,
+      TableRef innerRef, Analyzer analyzer) throws ImpalaException {
     // get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
     // are materialized)
-    List<BinaryPredicate> eqJoinConjuncts = Collections.emptyList();
-    if (innerRef != null) {
-      eqJoinConjuncts = getHashLookupJoinConjuncts(
-          outer.getTblRefIds(), inner.getTblRefIds(), analyzer);
-      // Outer joins should only use On-clause predicates as eqJoinConjuncts.
-      if (!innerRef.getJoinOp().isOuterJoin()) {
-        analyzer.createEquivConjuncts(outer.getTblRefIds(), inner.getTblRefIds(),
-            eqJoinConjuncts);
-      }
-    } else {
-      eqJoinConjuncts = getHashLookupJoinConjuncts(
-          inner.getTblRefIds(), outer.getTblRefIds(), analyzer);
-      // Outer joins should only use On-clause predicates as eqJoinConjuncts.
-      if (!outerRef.getJoinOp().isOuterJoin()) {
-        analyzer.createEquivConjuncts(inner.getTblRefIds(), outer.getTblRefIds(),
-            eqJoinConjuncts);
-      }
-      // Reverse the lhs/rhs of the join conjuncts.
-      for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
-        Expr swapTmp = eqJoinConjunct.getChild(0);
-        eqJoinConjunct.setChild(0, eqJoinConjunct.getChild(1));
-        eqJoinConjunct.setChild(1, swapTmp);
-      }
+    List<BinaryPredicate> eqJoinConjuncts = getHashLookupJoinConjuncts(
+        outer.getTblRefIds(), inner.getTblRefIds(), analyzer);
+    // Outer joins should only use On-clause predicates as eqJoinConjuncts.
+    if (!innerRef.getJoinOp().isOuterJoin()) {
+      analyzer.createEquivConjuncts(outer.getTblRefIds(), inner.getTblRefIds(),
+          eqJoinConjuncts);
     }
-
-    if (!eqJoinConjuncts.isEmpty() && tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
-      tblRef.setJoinOp(JoinOperator.INNER_JOIN);
+    if (!eqJoinConjuncts.isEmpty() && innerRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
+      innerRef.setJoinOp(JoinOperator.INNER_JOIN);
     }
 
     List<Expr> otherJoinConjuncts = Lists.newArrayList();
-    if (tblRef.getJoinOp().isOuterJoin()) {
+    if (innerRef.getJoinOp().isOuterJoin()) {
       // Also assign conjuncts from On clause. All remaining unassigned conjuncts
       // that can be evaluated by this join are assigned in createSelectPlan().
-      otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
-    } else if (tblRef.getJoinOp().isSemiJoin()) {
+      otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(innerRef);
+    } else if (innerRef.getJoinOp().isSemiJoin()) {
       // Unassigned conjuncts bound by the invisible tuple id of a semi join must have
       // come from the join's On-clause, and therefore, must be added to the other join
       // conjuncts to produce correct results.
@@ -1502,7 +1411,7 @@ public class SingleNodePlanner {
       List<TupleId> tblRefIds = Lists.newArrayList(outer.getTblRefIds());
       tblRefIds.addAll(inner.getTblRefIds());
       otherJoinConjuncts = analyzer.getUnassignedConjuncts(tblRefIds, false);
-      if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {
+      if (innerRef.getJoinOp().isNullAwareLeftAntiJoin()) {
         boolean hasNullMatchingEqOperator = false;
         // Keep only the null-matching eq conjunct in the eqJoinConjuncts and move
         // all the others in otherJoinConjuncts. The BE relies on this
@@ -1528,15 +1437,16 @@ public class SingleNodePlanner {
     // (build side) is a singular row src. A singular row src has a cardinality of 1, so
     // a nested-loop join is certainly cheaper than a hash join.
     JoinNode result = null;
-    Preconditions.checkState(!tblRef.getJoinOp().isNullAwareLeftAntiJoin()
+    Preconditions.checkState(!innerRef.getJoinOp().isNullAwareLeftAntiJoin()
         || !(inner instanceof SingularRowSrcNode));
     if (eqJoinConjuncts.isEmpty() || inner instanceof SingularRowSrcNode) {
       otherJoinConjuncts.addAll(eqJoinConjuncts);
-      result = new NestedLoopJoinNode(outer, inner, tblRef.getDistributionMode(),
-          tblRef.getJoinOp(), otherJoinConjuncts);
+      result = new NestedLoopJoinNode(outer, inner, analyzer.isStraightJoin(),
+          innerRef.getDistributionMode(), innerRef.getJoinOp(), otherJoinConjuncts);
     } else {
-      result = new HashJoinNode(outer, inner, tblRef.getDistributionMode(),
-          tblRef.getJoinOp(), eqJoinConjuncts, otherJoinConjuncts);
+      result = new HashJoinNode(outer, inner, analyzer.isStraightJoin(),
+          innerRef.getDistributionMode(), innerRef.getJoinOp(), eqJoinConjuncts,
+          otherJoinConjuncts);
     }
     result.init(analyzer);
     return result;
@@ -1553,8 +1463,8 @@ public class SingleNodePlanner {
    * Throws if a PlanNode.init() failed or if planning of the given
    * table ref is not implemented.
    */
-  private PlanNode createTableRefNode(TableRef tblRef, boolean isStraightJoin,
-      boolean fastPartitionKeyScans, Analyzer analyzer) throws ImpalaException {
+  private PlanNode createTableRefNode(TableRef tblRef, boolean fastPartitionKeyScans,
+      Analyzer analyzer) throws ImpalaException {
     PlanNode result = null;
     if (tblRef instanceof BaseTableRef) {
       result = createScanNode(tblRef, fastPartitionKeyScans, analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java
index f940635..88b3d7d 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java
@@ -23,7 +23,6 @@ import com.cloudera.impala.thrift.TExplainLevel;
 import com.cloudera.impala.thrift.TPlanNode;
 import com.cloudera.impala.thrift.TPlanNodeType;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * A SingularRowSrcNode returns the current row that is being processed by its
@@ -36,10 +35,16 @@ public class SingularRowSrcNode extends PlanNode {
 
   protected SingularRowSrcNode(PlanNodeId id, SubplanNode containingSubplanNode) {
     super(id, "SINGULAR ROW SRC");
-    tupleIds_ = Lists.newArrayList(containingSubplanNode.getChild(0).tupleIds_);
-    tblRefIds_ = Lists.newArrayList(containingSubplanNode.getChild(0).tblRefIds_);
-    nullableTupleIds_.addAll(containingSubplanNode.getChild(0).getNullableTupleIds());
     containingSubplanNode_ = containingSubplanNode;
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    clearTupleIds();
+    tupleIds_.addAll(containingSubplanNode_.getChild(0).getTupleIds());
+    tblRefIds_.addAll(containingSubplanNode_.getChild(0).getTblRefIds());
+    nullableTupleIds_.addAll(containingSubplanNode_.getChild(0).getNullableTupleIds());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SortNode.java b/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
index 68c447d..02f0df6 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SortNode.java
@@ -67,7 +67,7 @@ public class SortNode extends PlanNode {
 
   public SortNode(PlanNodeId id, PlanNode input, SortInfo info, boolean useTopN,
       long offset) {
-    super(id, Lists.newArrayList(info.getSortTupleDescriptor().getId()),
+    super(id, info.getSortTupleDescriptor().getId().asList(),
         getDisplayName(useTopN, false));
     info_ = info;
     useTopN_ = useTopN;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/SubplanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/SubplanNode.java b/fe/src/main/java/com/cloudera/impala/planner/SubplanNode.java
index bbc7d78..4512268 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/SubplanNode.java
@@ -35,6 +35,8 @@ import com.google.common.base.Preconditions;
  * evaluate any conjuncts.
  */
 public class SubplanNode extends PlanNode {
+  private PlanNode subplan_;
+
   public SubplanNode(PlanNode input) {
     super("SUBPLAN");
     children_.add(input);
@@ -48,10 +50,18 @@ public class SubplanNode extends PlanNode {
    */
   public void setSubplan(PlanNode subplan) {
     Preconditions.checkState(children_.size() == 1);
+    subplan_ = subplan;
     children_.add(subplan);
-    tblRefIds_.addAll(subplan.getTblRefIds());
-    tupleIds_.addAll(subplan.getTupleIds());
-    nullableTupleIds_.addAll(subplan.getNullableTupleIds());
+    computeTupleIds();
+  }
+
+  @Override
+  public void computeTupleIds() {
+    Preconditions.checkNotNull(subplan_);
+    clearTupleIds();
+    tblRefIds_.addAll(subplan_.getTblRefIds());
+    tupleIds_.addAll(subplan_.getTupleIds());
+    nullableTupleIds_.addAll(subplan_.getNullableTupleIds());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/UnionNode.java b/fe/src/main/java/com/cloudera/impala/planner/UnionNode.java
index e056b46..ef67277 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/UnionNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/UnionNode.java
@@ -62,7 +62,7 @@ public class UnionNode extends PlanNode {
   protected final TupleId tupleId_;
 
   protected UnionNode(PlanNodeId id, TupleId tupleId) {
-    super(id, Lists.newArrayList(tupleId), "UNION");
+    super(id, tupleId.asList(), "UNION");
     tupleId_ = tupleId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java b/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java
index b403527..3a45882 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java
@@ -27,7 +27,6 @@ import com.cloudera.impala.thrift.TPlanNodeType;
 import com.cloudera.impala.thrift.TUnnestNode;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * An UnnestNode scans over a collection materialized in memory, and returns
@@ -41,7 +40,7 @@ public class UnnestNode extends PlanNode {
 
   public UnnestNode(PlanNodeId id, SubplanNode containingSubplanNode,
       CollectionTableRef tblRef) {
-    super(id, Lists.newArrayList(tblRef.getDesc().getId()), "UNNEST");
+    super(id, tblRef.getDesc().getId().asList(), "UNNEST");
     containingSubplanNode_ = containingSubplanNode;
     tblRef_ = tblRef;
     collectionExpr_ = tblRef_.getCollectionExpr();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
index 7a9f9a2..1b8a973 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/implicit-joins.test
@@ -178,20 +178,20 @@ where a.id = d.id and b.id = c.id
 |
 05:NESTED LOOP JOIN [CROSS JOIN]
 |
-|--00:SCAN HDFS [functional.alltypestiny a]
-|     partitions=4/4 files=4 size=460B
-|     runtime filters: RF000 -> a.id
-|
-04:HASH JOIN [INNER JOIN]
-|  hash predicates: b.id = c.id
-|  runtime filters: RF001 <- c.id
-|
-|--02:SCAN HDFS [functional.alltypestiny c]
-|     partitions=4/4 files=4 size=460B
+|--04:HASH JOIN [INNER JOIN]
+|  |  hash predicates: b.id = c.id
+|  |  runtime filters: RF001 <- c.id
+|  |
+|  |--02:SCAN HDFS [functional.alltypestiny c]
+|  |     partitions=4/4 files=4 size=460B
+|  |
+|  01:SCAN HDFS [functional.alltypes b]
+|     partitions=24/24 files=24 size=478.45KB
+|     runtime filters: RF001 -> b.id
 |
-01:SCAN HDFS [functional.alltypes b]
-   partitions=24/24 files=24 size=478.45KB
-   runtime filters: RF001 -> b.id
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> a.id
 ====
 # Do not allow a non-equi outer join
 select count(*)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
index d43b81f..460e61c 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test
@@ -843,24 +843,24 @@ left semi join functional.alltypesagg t4 on (t3.id = t4.id)
 inner join functional.alltypes t5 on (t3.id = t5.id)
 right join functional.alltypestiny t6 on (t5.id = t6.id)
 ---- PLAN
-11:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
-10:HASH JOIN [LEFT OUTER JOIN]
+12:HASH JOIN [LEFT OUTER JOIN]
 |  hash predicates: t6.id = t5.id
 |
-|--09:HASH JOIN [INNER JOIN]
+|--11:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t3.id = t5.id
 |  |  runtime filters: RF000 <- t5.id
 |  |
 |  |--04:SCAN HDFS [functional.alltypes t5]
 |  |     partitions=24/24 files=24 size=478.45KB
 |  |
-|  08:HASH JOIN [RIGHT SEMI JOIN]
+|  10:HASH JOIN [RIGHT SEMI JOIN]
 |  |  hash predicates: t4.id = t3.id
 |  |  runtime filters: RF001 <- t3.id
 |  |
-|  |--07:HASH JOIN [INNER JOIN]
+|  |--09:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: t2.id = t3.id
 |  |  |  runtime filters: RF002 <- t3.id
 |  |  |
@@ -868,7 +868,7 @@ right join functional.alltypestiny t6 on (t5.id = t6.id)
 |  |  |     partitions=4/4 files=4 size=6.32KB
 |  |  |     runtime filters: RF000 -> t3.id
 |  |  |
-|  |  06:HASH JOIN [RIGHT OUTER JOIN]
+|  |  08:HASH JOIN [RIGHT OUTER JOIN]
 |  |  |  hash predicates: t2.id = t1.id
 |  |  |  runtime filters: RF003 <- t1.id
 |  |  |
@@ -896,21 +896,21 @@ inner join functional.alltypessmall t4 on (t3.id = t4.id)
 left semi join functional.alltypes t5 on (t4.id = t5.id)
 inner join functional.alltypestiny t6 on (t3.id = t6.id)
 ---- PLAN
-12:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
-11:HASH JOIN [INNER JOIN]
+12:HASH JOIN [INNER JOIN]
 |  hash predicates: t3.id = t6.id
 |  runtime filters: RF000 <- t6.id
 |
 |--05:SCAN HDFS [functional.alltypestiny t6]
 |     partitions=4/4 files=4 size=460B
 |
-10:HASH JOIN [RIGHT SEMI JOIN]
+11:HASH JOIN [RIGHT SEMI JOIN]
 |  hash predicates: t5.id = t4.id
 |  runtime filters: RF001 <- t4.id
 |
-|--09:HASH JOIN [INNER JOIN]
+|--10:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t3.id
 |  |  runtime filters: RF002 <- t3.id
 |  |
@@ -918,7 +918,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |     partitions=11/11 files=11 size=814.73KB
 |  |     runtime filters: RF000 -> t3.id
 |  |
-|  08:HASH JOIN [INNER JOIN]
+|  09:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t4.id
 |  |  runtime filters: RF003 <- t4.id
 |  |
@@ -926,7 +926,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |     partitions=4/4 files=4 size=6.32KB
 |  |     runtime filters: RF000 -> t4.id, RF002 -> t4.id
 |  |
-|  07:HASH JOIN [RIGHT OUTER JOIN]
+|  08:HASH JOIN [RIGHT OUTER JOIN]
 |  |  hash predicates: t2.id = t1.id
 |  |  runtime filters: RF004 <- t1.id
 |  |
@@ -950,20 +950,20 @@ inner join functional.alltypessmall t4 on (t3.id = t4.id)
 left anti join functional.alltypes t5 on (t4.id = t5.id)
 inner join functional.alltypestiny t6 on (t3.id = t6.id)
 ---- PLAN
-12:AGGREGATE [FINALIZE]
+13:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
-11:HASH JOIN [INNER JOIN]
+12:HASH JOIN [INNER JOIN]
 |  hash predicates: t3.id = t6.id
 |  runtime filters: RF000 <- t6.id
 |
 |--05:SCAN HDFS [functional.alltypestiny t6]
 |     partitions=4/4 files=4 size=460B
 |
-10:HASH JOIN [RIGHT ANTI JOIN]
+11:HASH JOIN [RIGHT ANTI JOIN]
 |  hash predicates: t5.id = t4.id
 |
-|--09:HASH JOIN [INNER JOIN]
+|--10:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t3.id
 |  |  runtime filters: RF001 <- t3.id
 |  |
@@ -971,7 +971,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |     partitions=11/11 files=11 size=814.73KB
 |  |     runtime filters: RF000 -> t3.id
 |  |
-|  08:HASH JOIN [INNER JOIN]
+|  09:HASH JOIN [INNER JOIN]
 |  |  hash predicates: t2.id = t4.id
 |  |  runtime filters: RF002 <- t4.id
 |  |
@@ -979,7 +979,7 @@ inner join functional.alltypestiny t6 on (t3.id = t6.id)
 |  |     partitions=4/4 files=4 size=6.32KB
 |  |     runtime filters: RF000 -> t4.id, RF001 -> t4.id
 |  |
-|  07:HASH JOIN [RIGHT OUTER JOIN]
+|  08:HASH JOIN [RIGHT OUTER JOIN]
 |  |  hash predicates: t2.id = t1.id
 |  |  runtime filters: RF003 <- t1.id
 |  |
@@ -1498,4 +1498,4 @@ and a.timestamp_col <=> now()
    partitions=24/24 files=24 size=478.45KB
    predicates: date_string_col IS NOT DISTINCT FROM ''
    runtime filters: RF000 -> functional.alltypes.id
-====
\ No newline at end of file
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
index f14363f..3946688 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/joins.test
@@ -1536,19 +1536,19 @@ where c.id != b.id
 04:NESTED LOOP JOIN [INNER JOIN]
 |  predicates: c.id != b.id
 |
-|--03:HASH JOIN [INNER JOIN]
-|  |  hash predicates: a.id = b.id
-|  |  runtime filters: RF000 <- b.id
-|  |
-|  |--01:SCAN HDFS [functional.alltypestiny b]
-|  |     partitions=4/4 files=4 size=460B
-|  |
-|  00:SCAN HDFS [functional.alltypestiny a]
+|--02:SCAN HDFS [functional.alltypes c]
+|     partitions=24/24 files=24 size=478.45KB
+|
+03:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|
+|--01:SCAN HDFS [functional.alltypestiny b]
 |     partitions=4/4 files=4 size=460B
-|     runtime filters: RF000 -> a.id
 |
-02:SCAN HDFS [functional.alltypes c]
-   partitions=24/24 files=24 size=478.45KB
+00:SCAN HDFS [functional.alltypestiny a]
+   partitions=4/4 files=4 size=460B
+   runtime filters: RF000 -> a.id
 ====
 # Tests the generation of a distributed plan in which the input fragments
 # of a join node have compatible but not the same number of partitioning exprs with
@@ -2018,33 +2018,33 @@ left anti join functional.alltypesagg e
   on c.string_col != e.string_col
 where a.id < 10
 ---- PLAN
-08:NESTED LOOP JOIN [LEFT ANTI JOIN]
+08:NESTED LOOP JOIN [RIGHT ANTI JOIN]
 |  join predicates: c.string_col != e.string_col
 |
-|--04:SCAN HDFS [functional.alltypesagg e]
-|     partitions=11/11 files=11 size=814.73KB
-|
-07:NESTED LOOP JOIN [LEFT SEMI JOIN]
-|  join predicates: b.bigint_col > d.bigint_col
-|
-|--03:SCAN HDFS [functional.alltypesagg d]
-|     partitions=11/11 files=11 size=814.73KB
-|
-06:NESTED LOOP JOIN [RIGHT OUTER JOIN]
-|  join predicates: a.int_col != c.int_col OR a.tinyint_col > c.tinyint_col
-|
-|--05:NESTED LOOP JOIN [INNER JOIN]
-|  |  predicates: a.id < b.id
+|--07:NESTED LOOP JOIN [RIGHT SEMI JOIN]
+|  |  join predicates: b.bigint_col > d.bigint_col
 |  |
-|  |--00:SCAN HDFS [functional.alltypestiny a]
-|  |     partitions=4/4 files=4 size=460B
-|  |     predicates: a.id < 10
+|  |--06:NESTED LOOP JOIN [RIGHT OUTER JOIN]
+|  |  |  join predicates: a.int_col != c.int_col OR a.tinyint_col > c.tinyint_col
+|  |  |
+|  |  |--05:NESTED LOOP JOIN [INNER JOIN]
+|  |  |  |  predicates: a.id < b.id
+|  |  |  |
+|  |  |  |--00:SCAN HDFS [functional.alltypestiny a]
+|  |  |  |     partitions=4/4 files=4 size=460B
+|  |  |  |     predicates: a.id < 10
+|  |  |  |
+|  |  |  01:SCAN HDFS [functional.alltypessmall b]
+|  |  |     partitions=4/4 files=4 size=6.32KB
+|  |  |
+|  |  02:SCAN HDFS [functional.alltypes c]
+|  |     partitions=24/24 files=24 size=478.45KB
 |  |
-|  01:SCAN HDFS [functional.alltypessmall b]
-|     partitions=4/4 files=4 size=6.32KB
+|  03:SCAN HDFS [functional.alltypesagg d]
+|     partitions=11/11 files=11 size=814.73KB
 |
-02:SCAN HDFS [functional.alltypes c]
-   partitions=24/24 files=24 size=478.45KB
+04:SCAN HDFS [functional.alltypesagg e]
+   partitions=11/11 files=11 size=814.73KB
 ====
 # Regression test for IMPALA-2495: Crash: impala::InPredicate::SetLookupPrepare
 select count(id) from functional.alltypestiny t1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
index a8cbc3c..d900fc0 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/nested-collections.test
@@ -83,7 +83,7 @@ where c_nationkey = n_nationkey and s_nationkey = n_nationkey
 |     runtime filters: RF000 -> s.s_nationkey, RF001 -> s_comment
 |
 05:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
    runtime filters: RF000 -> c_nationkey, RF001 -> c.c_comment, RF002 -> c.c_nationkey, RF003 -> c_comment
 ====
 # Test subplans: Cross join of parent and relative ref.
@@ -584,17 +584,17 @@ select 1 from functional.allcomplextypes a
 |  |
 |  |--05:UNNEST [a.struct_array_col d]
 |  |
-|  08:HASH JOIN [INNER JOIN]
-|  |  hash predicates: a.year = c.value
+|  08:NESTED LOOP JOIN [INNER JOIN]
+|  |  predicates: (a.id < 1 OR b.item > 2)
 |  |
-|  |--04:UNNEST [a.int_map_col c]
+|  |--03:UNNEST [a.int_array_col b]
 |  |
 |  07:NESTED LOOP JOIN [INNER JOIN]
-|  |  predicates: (a.id < 1 OR b.item > 2)
+|  |  join predicates: c.value = a.year
 |  |
 |  |--02:SINGULAR ROW SRC
 |  |
-|  03:UNNEST [a.int_array_col b]
+|  04:UNNEST [a.int_map_col c]
 |
 00:SCAN HDFS [functional.allcomplextypes a]
    partitions=0/0 files=0 size=0B
@@ -1101,8 +1101,8 @@ where b.id = d.value
 |  11:UNNEST [a.struct_array_col x]
 |
 16:HASH JOIN [INNER JOIN]
-|  hash predicates: c.item = d.value
-|  runtime filters: RF000 <- d.value
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
 |
 |--06:SUBPLAN
 |  |
@@ -1209,7 +1209,7 @@ limit 10
 |  03:UNNEST [c.c_orders o]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
    predicates: c_custkey < 10, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o_orderkey < 5
    predicates on o_lineitems: l_linenumber < 3
@@ -1306,7 +1306,7 @@ where c.c_custkey = o.o_orderkey and c.c_custkey = o.o_shippriority
 |  03:UNNEST [c.c_orders o]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
    predicates: c.c_custkey = c.c_nationkey, !empty(c.c_orders)
    predicates on o: !empty(o.o_lineitems), o.o_orderkey = o.o_shippriority
    predicates on l: l.l_partkey = l.l_linenumber, l.l_partkey = l.l_suppkey
@@ -1787,5 +1787,5 @@ where o.o_orderkey is null and o.o_orderstatus <=> o_orderpriority
 |  03:UNNEST [c.c_orders o]
 |
 00:SCAN HDFS [tpch_nested_parquet.customer c]
-   partitions=1/1 files=4 size=554.13MB
+   partitions=1/1 files=4 size=577.87MB
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/532b1fe1/testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test b/testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test
index 17be43a..c7f9830 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/nested-loop-join.test
@@ -149,3 +149,56 @@ where e.id < 10
 ---- DISTRIBUTEDPLAN
 not implemented: Error generating a valid execution plan for this query. A RIGHT ANTI JOIN type with no equi-join predicates can only be executed with a single node plan.
 ====
+# Right semi and outer joins are inverted to make them executable.
+# Same query as above but without the straight join hint.
+select count(*)
+from functional.alltypestiny a inner join functional.alltypessmall b on a.id < b.id
+right outer join functional.alltypesagg c on a.int_col != c.int_col
+right semi join functional.alltypes d on c.tinyint_col < d.tinyint_col
+right anti join functional.alltypesnopart e on d.tinyint_col > e.tinyint_col
+where e.id < 10
+---- DISTRIBUTEDPLAN
+15:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|
+14:EXCHANGE [UNPARTITIONED]
+|
+09:AGGREGATE
+|  output: count(*)
+|
+08:NESTED LOOP JOIN [LEFT ANTI JOIN, BROADCAST]
+|  join predicates: d.tinyint_col > e.tinyint_col
+|
+|--13:EXCHANGE [BROADCAST]
+|  |
+|  07:NESTED LOOP JOIN [LEFT SEMI JOIN, BROADCAST]
+|  |  join predicates: c.tinyint_col < d.tinyint_col
+|  |
+|  |--12:EXCHANGE [BROADCAST]
+|  |  |
+|  |  06:NESTED LOOP JOIN [LEFT OUTER JOIN, BROADCAST]
+|  |  |  join predicates: a.int_col != c.int_col
+|  |  |
+|  |  |--11:EXCHANGE [BROADCAST]
+|  |  |  |
+|  |  |  05:NESTED LOOP JOIN [INNER JOIN, BROADCAST]
+|  |  |  |  predicates: a.id < b.id
+|  |  |  |
+|  |  |  |--10:EXCHANGE [BROADCAST]
+|  |  |  |  |
+|  |  |  |  00:SCAN HDFS [functional.alltypestiny a]
+|  |  |  |     partitions=4/4 files=4 size=460B
+|  |  |  |
+|  |  |  01:SCAN HDFS [functional.alltypessmall b]
+|  |  |     partitions=4/4 files=4 size=6.32KB
+|  |  |
+|  |  02:SCAN HDFS [functional.alltypesagg c]
+|  |     partitions=11/11 files=11 size=814.73KB
+|  |
+|  03:SCAN HDFS [functional.alltypes d]
+|     partitions=24/24 files=24 size=478.45KB
+|
+04:SCAN HDFS [functional.alltypesnopart e]
+   partitions=1/1 files=0 size=0B
+   predicates: e.id < 10
+====