You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:36 UTC

[19/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
deleted file mode 100644
index 48a71dc..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
+++ /dev/null
@@ -1,388 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.catalog.HdfsFileFormat;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.common.AnalysisException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.common.NotImplementedException;
-import com.cloudera.impala.common.TreeNode;
-import com.cloudera.impala.planner.JoinNode.DistributionMode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPartitionType;
-import com.cloudera.impala.thrift.TPlan;
-import com.cloudera.impala.thrift.TPlanFragment;
-import com.cloudera.impala.thrift.TPlanFragmentTree;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Lists;
-
-/**
- * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments
- * connected in that way forms a plan. The output of a plan is produced by the root
- * fragment and is either the result of the query or an intermediate result
- * needed by a different plan (such as a hash table).
- *
- * Plans are grouped into cohorts based on the consumer of their output: all
- * plans that materialize intermediate results for a particular consumer plan
- * are grouped into a single cohort.
- *
- * A PlanFragment encapsulates the specific tree of execution nodes that
- * are used to produce the output of the plan fragment, as well as output exprs,
- * destination node, etc. If there are no output exprs, the full row that is
- * is produced by the plan root is marked as materialized.
- *
- * A plan fragment can have one or many instances, each of which in turn is executed by
- * an individual node and the output sent to a specific instance of the destination
- * fragment (or, in the case of the root fragment, is materialized in some form).
- *
- * A hash-partitioned plan fragment is the result of one or more hash-partitioning data
- * streams being received by plan nodes in this fragment. In the future, a fragment's
- * data partition could also be hash partitioned based on a scan node that is reading
- * from a physically hash-partitioned table.
- *
- * The sequence of calls is:
- * - c'tor
- * - assemble with getters, etc.
- * - finalize()
- * - toThrift()
- *
- * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes
- *   it impossible search for things within a fragment (using TreeNode functions);
- *   fix that
- */
-public class PlanFragment extends TreeNode<PlanFragment> {
-  private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class);
-
-  private final PlanFragmentId fragmentId_;
-  private PlanId planId_;
-  private CohortId cohortId_;
-
-  // root of plan tree executed by this fragment
-  private PlanNode planRoot_;
-
-  // exchange node to which this fragment sends its output
-  private ExchangeNode destNode_;
-
-  // if null, outputs the entire row produced by planRoot_
-  private List<Expr> outputExprs_;
-
-  // created in finalize() or set in setSink()
-  private DataSink sink_;
-
-  // specification of the partition of the input of this fragment;
-  // an UNPARTITIONED fragment is executed on only a single node
-  // TODO: improve this comment, "input" is a bit misleading
-  private DataPartition dataPartition_;
-
-  // specification of how the output of this fragment is partitioned (i.e., how
-  // it's sent to its destination);
-  // if the output is UNPARTITIONED, it is being broadcast
-  private DataPartition outputPartition_;
-
-  /**
-   * C'tor for fragment with specific partition; the output is by default broadcast.
-   */
-  public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition) {
-    fragmentId_ = id;
-    planRoot_ = root;
-    dataPartition_ = partition;
-    outputPartition_ = DataPartition.UNPARTITIONED;
-    setFragmentInPlanTree(planRoot_);
-  }
-
-  /**
-   * Assigns 'this' as fragment of all PlanNodes in the plan tree rooted at node.
-   * Does not traverse the children of ExchangeNodes because those must belong to a
-   * different fragment.
-   */
-  public void setFragmentInPlanTree(PlanNode node) {
-    if (node == null) return;
-    node.setFragment(this);
-    if (node instanceof ExchangeNode) return;
-    for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child);
-  }
-
-  /**
-   * Collect all PlanNodes that belong to the exec tree of this fragment.
-   */
-  public void collectPlanNodes(List<PlanNode> nodes) {
-    Preconditions.checkNotNull(nodes);
-    collectPlanNodesHelper(planRoot_, nodes);
-  }
-
-  private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) {
-    if (root == null) return;
-    nodes.add(root);
-    if (root instanceof ExchangeNode) return;
-    for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes);
-  }
-
-  public void setOutputExprs(List<Expr> outputExprs) {
-    outputExprs_ = Expr.cloneList(outputExprs);
-  }
-  public List<Expr> getOutputExprs() { return outputExprs_; }
-
-  /**
-   * Finalize plan tree and create stream sink, if needed.
-   * If this fragment is hash partitioned, ensures that the corresponding partition
-   * exprs of all hash-partitioning senders are cast to identical types.
-   * Otherwise, the hashes generated for identical partition values may differ
-   * among senders if the partition-expr types are not identical.
-   */
-  public void finalize(Analyzer analyzer)
-      throws InternalException, NotImplementedException {
-    if (destNode_ != null) {
-      Preconditions.checkState(sink_ == null);
-      // we're streaming to an exchange node
-      DataStreamSink streamSink = new DataStreamSink(destNode_, outputPartition_);
-      streamSink.setFragment(this);
-      sink_ = streamSink;
-    }
-
-    if (!dataPartition_.isHashPartitioned()) return;
-
-    // This fragment is hash partitioned. Gather all exchange nodes and ensure
-    // that all hash-partitioning senders hash on exprs-values of the same type.
-    List<ExchangeNode> exchNodes = Lists.newArrayList();
-    planRoot_.collect(Predicates.instanceOf(ExchangeNode.class), exchNodes);
-
-    // Contains partition-expr lists of all hash-partitioning sender fragments.
-    List<List<Expr>> senderPartitionExprs = Lists.newArrayList();
-    for (ExchangeNode exchNode: exchNodes) {
-      Preconditions.checkState(!exchNode.getChildren().isEmpty());
-      PlanFragment senderFragment = exchNode.getChild(0).getFragment();
-      Preconditions.checkNotNull(senderFragment);
-      if (!senderFragment.getOutputPartition().isHashPartitioned()) continue;
-      List<Expr> partExprs = senderFragment.getOutputPartition().getPartitionExprs();
-      // All hash-partitioning senders must have compatible partition exprs, otherwise
-      // this fragment's data partition must not be hash partitioned.
-      Preconditions.checkState(
-          partExprs.size() == dataPartition_.getPartitionExprs().size());
-      senderPartitionExprs.add(partExprs);
-    }
-
-    // Cast all corresponding hash partition exprs of all hash-partitioning senders
-    // to their compatible types. Also cast the data partition's exprs for consistency,
-    // although not strictly necessary. They should already be type identical to the
-    // exprs of one of the senders and they are not directly used for hashing in the BE.
-    senderPartitionExprs.add(dataPartition_.getPartitionExprs());
-    try {
-      analyzer.castToUnionCompatibleTypes(senderPartitionExprs);
-    } catch (AnalysisException e) {
-      // Should never happen. Analysis should have ensured type compatibility already.
-      throw new IllegalStateException(e);
-    }
-  }
-
-  /**
-   * Return the number of nodes on which the plan fragment will execute.
-   * invalid: -1
-   */
-  public int getNumNodes() {
-    return dataPartition_ == DataPartition.UNPARTITIONED ? 1 : planRoot_.getNumNodes();
-  }
-
- /**
-   * Estimates the per-node number of distinct values of exprs based on the data
-   * partition of this fragment and its number of nodes. Returns -1 for an invalid
-   * estimate, e.g., because getNumDistinctValues() failed on one of the exprs.
-   */
-  public long getNumDistinctValues(List<Expr> exprs) {
-    Preconditions.checkNotNull(dataPartition_);
-    long result = 1;
-    int numNodes = getNumNodes();
-    Preconditions.checkState(numNodes >= 0);
-    // The number of nodes is zero for empty tables.
-    if (numNodes == 0) return 0;
-    for (Expr expr: exprs) {
-      long numDistinct = expr.getNumDistinctValues();
-      if (numDistinct == -1) {
-        result = -1;
-        break;
-      }
-      if (dataPartition_.getPartitionExprs().contains(expr)) {
-        numDistinct = (long)Math.max((double) numDistinct / (double) numNodes, 1L);
-      }
-      result = PlanNode.multiplyCardinalities(result, numDistinct);
-    }
-    return result;
-  }
-
-  public TPlanFragment toThrift() {
-    TPlanFragment result = new TPlanFragment();
-    result.setDisplay_name(fragmentId_.toString());
-    if (planRoot_ != null) result.setPlan(planRoot_.treeToThrift());
-    if (outputExprs_ != null) {
-      result.setOutput_exprs(Expr.treesToThrift(outputExprs_));
-    }
-    if (sink_ != null) result.setOutput_sink(sink_.toThrift());
-    result.setPartition(dataPartition_.toThrift());
-    return result;
-  }
-
-  public TPlanFragmentTree treeToThrift() {
-    TPlanFragmentTree result = new TPlanFragmentTree();
-    treeToThriftHelper(result);
-    return result;
-  }
-
-  private void treeToThriftHelper(TPlanFragmentTree plan) {
-    plan.addToFragments(toThrift());
-    for (PlanFragment child: children_) {
-      child.treeToThriftHelper(plan);
-    }
-  }
-
-  public String getExplainString(TExplainLevel detailLevel) {
-    return getExplainString("", "", detailLevel);
-  }
-
-  /**
-   * The root of the output tree will be prefixed by rootPrefix and the remaining plan
-   * output will be prefixed by prefix.
-   */
-  protected final String getExplainString(String rootPrefix, String prefix,
-      TExplainLevel detailLevel) {
-    StringBuilder str = new StringBuilder();
-    Preconditions.checkState(dataPartition_ != null);
-    String detailPrefix = prefix + "|  ";  // sink detail
-    if (detailLevel == TExplainLevel.VERBOSE) {
-      // we're printing a new tree, start over with the indentation
-      prefix = "  ";
-      rootPrefix = "  ";
-      detailPrefix = prefix + "|  ";
-      str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(),
-          dataPartition_.getExplainString()));
-      if (sink_ != null && sink_ instanceof DataStreamSink) {
-        str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + "\n");
-      }
-    }
-
-    String planRootPrefix = rootPrefix;
-    // Always print sinks other than DataStreamSinks.
-    if (sink_ != null && !(sink_ instanceof DataStreamSink)) {
-      str.append(sink_.getExplainString(rootPrefix, detailPrefix, detailLevel));
-      if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-        str.append(prefix + "|\n");
-      }
-      // we already used the root prefix for the sink
-      planRootPrefix = prefix;
-    }
-    if (planRoot_ != null) {
-      str.append(planRoot_.getExplainString(planRootPrefix, prefix, detailLevel));
-    }
-    return str.toString();
-  }
-
-  /** Returns true if this fragment is partitioned. */
-  public boolean isPartitioned() {
-    return (dataPartition_.getType() != TPartitionType.UNPARTITIONED);
-  }
-
-  public PlanFragmentId getId() { return fragmentId_; }
-  public PlanId getPlanId() { return planId_; }
-  public void setPlanId(PlanId id) { planId_ = id; }
-  public CohortId getCohortId() { return cohortId_; }
-  public void setCohortId(CohortId id) { cohortId_ = id; }
-  public PlanFragment getDestFragment() {
-    if (destNode_ == null) return null;
-    return destNode_.getFragment();
-  }
-  public ExchangeNode getDestNode() { return destNode_; }
-  public DataPartition getDataPartition() { return dataPartition_; }
-  public void setDataPartition(DataPartition dataPartition) {
-    this.dataPartition_ = dataPartition;
-  }
-  public DataPartition getOutputPartition() { return outputPartition_; }
-  public void setOutputPartition(DataPartition outputPartition) {
-    this.outputPartition_ = outputPartition;
-  }
-  public PlanNode getPlanRoot() { return planRoot_; }
-  public void setPlanRoot(PlanNode root) {
-    planRoot_ = root;
-    setFragmentInPlanTree(planRoot_);
-  }
-
-  public void setDestination(ExchangeNode destNode) {
-    destNode_ = destNode;
-    PlanFragment dest = getDestFragment();
-    Preconditions.checkNotNull(dest);
-    dest.addChild(this);
-  }
-
-  public boolean hasSink() { return sink_ != null; }
-  public DataSink getSink() { return sink_; }
-  public void setSink(DataSink sink) {
-    Preconditions.checkState(this.sink_ == null);
-    Preconditions.checkNotNull(sink);
-    sink.setFragment(this);
-    this.sink_ = sink;
-  }
-
-  /**
-   * Adds a node as the new root to the plan tree. Connects the existing
-   * root as the child of newRoot.
-   */
-  public void addPlanRoot(PlanNode newRoot) {
-    Preconditions.checkState(newRoot.getChildren().size() == 1);
-    newRoot.setChild(0, planRoot_);
-    planRoot_ = newRoot;
-    planRoot_.setFragment(this);
-  }
-
-  /**
-   * Verify that the tree of PlanFragments and their contained tree of
-   * PlanNodes is constructed correctly.
-   */
-  public void verifyTree() {
-    // PlanNode.fragment_ is set correctly
-    List<PlanNode> nodes = Lists.newArrayList();
-    collectPlanNodes(nodes);
-    List<PlanNode> exchNodes = Lists.newArrayList();
-    for (PlanNode node: nodes) {
-      if (node instanceof ExchangeNode) exchNodes.add(node);
-      Preconditions.checkState(node.getFragment() == this);
-    }
-
-    // all ExchangeNodes have registered input fragments
-    Preconditions.checkState(exchNodes.size() == getChildren().size());
-    List<PlanFragment> childFragments = Lists.newArrayList();
-    for (PlanNode exchNode: exchNodes) {
-      PlanFragment childFragment = exchNode.getChild(0).getFragment();
-      Preconditions.checkState(!childFragments.contains(childFragment));
-      childFragments.add(childFragment);
-      Preconditions.checkState(childFragment.getDestNode() == exchNode);
-    }
-    // all registered children are accounted for
-    Preconditions.checkState(getChildren().containsAll(childFragments));
-
-    for (PlanFragment child: getChildren()) child.verifyTree();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java
deleted file mode 100644
index 98b08fe..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragmentId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class PlanFragmentId extends Id<PlanFragmentId> {
-  // Construction only allowed via an IdGenerator.
-  protected PlanFragmentId(int id) {
-    super(id);
-  }
-
-  public static IdGenerator<PlanFragmentId> createGenerator() {
-    return new IdGenerator<PlanFragmentId>() {
-      @Override
-      public PlanFragmentId getNextId() { return new PlanFragmentId(nextId_++); }
-      @Override
-      public PlanFragmentId getMaxId() { return new PlanFragmentId(nextId_ - 1); }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return String.format("F%02d", id_);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
deleted file mode 100644
index 2cecbd8..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class PlanId extends Id<PlanId> {
-  // Construction only allowed via an IdGenerator.
-  protected PlanId(int id) {
-    super(id);
-  }
-
-  public static IdGenerator<PlanId> createGenerator() {
-    return new IdGenerator<PlanId>() {
-      @Override
-      public PlanId getNextId() { return new PlanId(nextId_++); }
-      @Override
-      public PlanId getMaxId() { return new PlanId(nextId_ - 1); }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%02d", id_);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
deleted file mode 100644
index d38f10a..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
+++ /dev/null
@@ -1,715 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprId;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.common.TreeNode;
-import com.cloudera.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
-import com.cloudera.impala.thrift.TExecStats;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPlan;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.math.LongMath;
-
-/**
- * Each PlanNode represents a single relational operator
- * and encapsulates the information needed by the planner to
- * make optimization decisions.
- *
- * finalize(): Computes internal state, such as keys for scan nodes; gets called once on
- * the root of the plan tree before the call to toThrift(). Also finalizes the set
- * of conjuncts, such that each remaining one requires all of its referenced slots to
- * be materialized (ie, can be evaluated by calling GetValue(), rather than being
- * implicitly evaluated as part of a scan key).
- *
- * conjuncts_: Each node has a list of conjuncts that can be executed in the context of
- * this node, ie, they only reference tuples materialized by this node or one of
- * its children (= are bound by tupleIds_).
- */
-abstract public class PlanNode extends TreeNode<PlanNode> {
-  private final static Logger LOG = LoggerFactory.getLogger(PlanNode.class);
-
-  // TODO: Retrieve from the query options instead of using a default.
-  protected final static int DEFAULT_BATCH_SIZE = 1024;
-
-  // String used for this node in getExplainString().
-  protected String displayName_;
-
-  // unique w/in plan tree; assigned by planner, and not necessarily in c'tor
-  protected PlanNodeId id_;
-
-  protected long limit_; // max. # of rows to be returned; 0: no limit_
-
-  // ids materialized by the tree rooted at this node
-  protected ArrayList<TupleId> tupleIds_;
-
-  // ids of the TblRefs "materialized" by this node; identical with tupleIds_
-  // if the tree rooted at this node only materializes BaseTblRefs;
-  // useful during plan generation
-  protected ArrayList<TupleId> tblRefIds_;
-
-  // A set of nullable TupleId produced by this node. It is a subset of tupleIds_.
-  // A tuple is nullable within a particular plan tree if it's the "nullable" side of
-  // an outer join, which has nothing to do with the schema.
-  protected Set<TupleId> nullableTupleIds_ = Sets.newHashSet();
-
-  protected List<Expr> conjuncts_ = Lists.newArrayList();
-
-  // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been
-  // assigned to a fragment. Set and maintained by enclosing PlanFragment.
-  protected PlanFragment fragment_;
-
-  // if set, needs to be applied by parent node to reference this node's output
-  protected ExprSubstitutionMap outputSmap_;
-
-  // global state of planning wrt conjunct assignment; used by planner as a shortcut
-  // to avoid having to pass assigned conjuncts back and forth
-  // (the planner uses this to save and reset the global state in between join tree
-  // alternatives)
-  // TODO for 2.3: Save this state in the PlannerContext instead.
-  protected Set<ExprId> assignedConjuncts_;
-
-  // estimate of the output cardinality of this node; set in computeStats();
-  // invalid: -1
-  protected long cardinality_;
-
-  // number of nodes on which the plan tree rooted at this node would execute;
-  // set in computeStats(); invalid: -1
-  protected int numNodes_;
-
-  // sum of tupleIds_' avgSerializedSizes; set in computeStats()
-  protected float avgRowSize_;
-
-  // estimated per-host memory requirement for this node;
-  // set in computeCosts(); invalid: -1
-  protected long perHostMemCost_ = -1;
-
-  // Runtime filters assigned to this node.
-  protected List<RuntimeFilter> runtimeFilters_ = Lists.newArrayList();
-
-  protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String displayName) {
-    this(id, displayName);
-    tupleIds_.addAll(tupleIds);
-    tblRefIds_.addAll(tupleIds);
-  }
-
-  /**
-   * Deferred id_ assignment.
-   */
-  protected PlanNode(String displayName) {
-    this(null, displayName);
-  }
-
-  protected PlanNode(PlanNodeId id, String displayName) {
-    id_ = id;
-    limit_ = -1;
-    tupleIds_ = Lists.newArrayList();
-    tblRefIds_ = Lists.newArrayList();
-    cardinality_ = -1;
-    numNodes_ = -1;
-    displayName_ = displayName;
-  }
-
-  /**
-   * Copy c'tor. Also passes in new id_.
-   */
-  protected PlanNode(PlanNodeId id, PlanNode node, String displayName) {
-    id_ = id;
-    limit_ = node.limit_;
-    tupleIds_ = Lists.newArrayList(node.tupleIds_);
-    tblRefIds_ = Lists.newArrayList(node.tblRefIds_);
-    nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_);
-    conjuncts_ = Expr.cloneList(node.conjuncts_);
-    cardinality_ = -1;
-    numNodes_ = -1;
-    displayName_ = displayName;
-  }
-
-  /**
-   * Sets tblRefIds_, tupleIds_, and nullableTupleIds_.
-   * The default implementation is a no-op.
-   */
-  public void computeTupleIds() {
-    Preconditions.checkState(children_.isEmpty() || !tupleIds_.isEmpty());
-  }
-
-  /**
-   * Clears tblRefIds_, tupleIds_, and nullableTupleIds_.
-   */
-  protected void clearTupleIds() {
-    tblRefIds_.clear();
-    tupleIds_.clear();
-    nullableTupleIds_.clear();
-  }
-
-  public PlanNodeId getId() { return id_; }
-  public void setId(PlanNodeId id) {
-    Preconditions.checkState(id_ == null);
-    id_ = id;
-  }
-  public long getLimit() { return limit_; }
-  public boolean hasLimit() { return limit_ > -1; }
-  public long getPerHostMemCost() { return perHostMemCost_; }
-  public long getCardinality() { return cardinality_; }
-  public int getNumNodes() { return numNodes_; }
-  public float getAvgRowSize() { return avgRowSize_; }
-  public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
-  public PlanFragment getFragment() { return fragment_; }
-  public List<Expr> getConjuncts() { return conjuncts_; }
-  public ExprSubstitutionMap getOutputSmap() { return outputSmap_; }
-  public void setOutputSmap(ExprSubstitutionMap smap) { outputSmap_ = smap; }
-  public Set<ExprId> getAssignedConjuncts() { return assignedConjuncts_; }
-  public void setAssignedConjuncts(Set<ExprId> conjuncts) {
-    assignedConjuncts_ = conjuncts;
-  }
-
-  /**
-   * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_
-   * is lower.
-   * @param limit_
-   */
-  public void setLimit(long limit) {
-    if (limit_ == -1 || (limit != -1 && limit_ > limit)) limit_ = limit;
-  }
-
-  public void unsetLimit() { limit_ = -1; }
-
-  public ArrayList<TupleId> getTupleIds() {
-    Preconditions.checkState(tupleIds_ != null);
-    return tupleIds_;
-  }
-
-  public ArrayList<TupleId> getTblRefIds() { return tblRefIds_; }
-  public void setTblRefIds(ArrayList<TupleId> ids) { tblRefIds_ = ids; }
-
-  public Set<TupleId> getNullableTupleIds() {
-    Preconditions.checkState(nullableTupleIds_ != null);
-    return nullableTupleIds_;
-  }
-
-  public void addConjuncts(List<Expr> conjuncts) {
-    if (conjuncts == null)  return;
-    conjuncts_.addAll(conjuncts);
-  }
-
-  public void transferConjuncts(PlanNode recipient) {
-    recipient.conjuncts_.addAll(conjuncts_);
-    conjuncts_.clear();
-  }
-
-  public String getExplainString() {
-    return getExplainString("", "", TExplainLevel.VERBOSE);
-  }
-
-  protected void setDisplayName(String s) { displayName_ = s; }
-
-  final protected String getDisplayLabel() {
-    return String.format("%s:%s", id_.toString(), displayName_);
-  }
-
-  /**
-   * Subclasses can override to provide a node specific detail string that
-   * is displayed to the user.
-   * e.g. scan can return the table name.
-   */
-  protected String getDisplayLabelDetail() { return ""; }
-
-  /**
-   * Generate the explain plan tree. The plan will be in the form of:
-   *
-   * root
-   * |
-   * |----child 3
-   * |      limit:1
-   * |
-   * |----child 2
-   * |      limit:2
-   * |
-   * child 1
-   *
-   * The root node header line will be prefixed by rootPrefix and the remaining plan
-   * output will be prefixed by prefix.
-   */
-  protected final String getExplainString(String rootPrefix, String prefix,
-      TExplainLevel detailLevel) {
-    StringBuilder expBuilder = new StringBuilder();
-    String detailPrefix = prefix;
-    String filler;
-    boolean printFiller = (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal());
-
-    // Do not traverse into the children of an Exchange node to avoid crossing
-    // fragment boundaries.
-    boolean traverseChildren = !children_.isEmpty() &&
-        !(this instanceof ExchangeNode && detailLevel == TExplainLevel.VERBOSE);
-
-    if (traverseChildren) {
-      detailPrefix += "|  ";
-      filler = prefix + "|";
-    } else {
-      detailPrefix += "   ";
-      filler = prefix;
-    }
-
-    // Print the current node
-    // The plan node header line will be prefixed by rootPrefix and the remaining details
-    // will be prefixed by detailPrefix.
-    expBuilder.append(getNodeExplainString(rootPrefix, detailPrefix, detailLevel));
-
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal() &&
-        !(this instanceof SortNode)) {
-      if (limit_ != -1) expBuilder.append(detailPrefix + "limit: " + limit_ + "\n");
-      expBuilder.append(getOffsetExplainString(detailPrefix));
-    }
-
-    // Output cardinality, cost estimates and tuple Ids only when explain plan level
-    // is extended or above.
-    if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      // Print estimated output cardinality and memory cost.
-      expBuilder.append(PrintUtils.printHosts(detailPrefix, numNodes_));
-      expBuilder.append(PrintUtils.printMemCost(" ", perHostMemCost_) + "\n");
-
-      // Print tuple ids and row size.
-      expBuilder.append(detailPrefix + "tuple-ids=");
-      for (int i = 0; i < tupleIds_.size(); ++i) {
-        TupleId tupleId = tupleIds_.get(i);
-        String nullIndicator = nullableTupleIds_.contains(tupleId) ? "N" : "";
-        expBuilder.append(tupleId.asInt() + nullIndicator);
-        if (i + 1 != tupleIds_.size()) expBuilder.append(",");
-      }
-      expBuilder.append(" row-size=" + PrintUtils.printBytes(Math.round(avgRowSize_)));
-      expBuilder.append(PrintUtils.printCardinality(" ", cardinality_));
-      expBuilder.append("\n");
-    }
-
-    // Print the children. Do not traverse into the children of an Exchange node to
-    // avoid crossing fragment boundaries.
-    if (traverseChildren) {
-      if (printFiller) expBuilder.append(filler + "\n");
-      String childHeadlinePrefix = prefix + "|--";
-      String childDetailPrefix = prefix + "|  ";
-      for (int i = children_.size() - 1; i >= 1; --i) {
-        PlanNode child = getChild(i);
-        if (fragment_ != child.fragment_) {
-          // we're crossing a fragment boundary
-          expBuilder.append(
-              child.fragment_.getExplainString(
-                childHeadlinePrefix, childDetailPrefix, detailLevel));
-        } else {
-          expBuilder.append(
-              child.getExplainString(childHeadlinePrefix, childDetailPrefix,
-                  detailLevel));
-        }
-        if (printFiller) expBuilder.append(filler + "\n");
-      }
-      expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel));
-    }
-    return expBuilder.toString();
-  }
-
-  /**
-   * Return the node-specific details.
-   * Subclass should override this function.
-   * Each line should be prefixed by detailPrefix.
-   */
-  protected String getNodeExplainString(String rootPrefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    return "";
-  }
-
-  /**
-   * Return the offset_ details, if applicable. This is available separately from
-   * 'getNodeExplainString' because we want to output 'limit: ...' (which can be printed
-   * from PlanNode) before 'offset: ...', which is only printed from SortNodes right
-   * now.
-   */
-  protected String getOffsetExplainString(String prefix) {
-    return "";
-  }
-
-  // Convert this plan node, including all children, to its Thrift representation.
-  public TPlan treeToThrift() {
-    TPlan result = new TPlan();
-    treeToThriftHelper(result);
-    return result;
-  }
-
-  // Append a flattened version of this plan node, including all children, to 'container'.
-  private void treeToThriftHelper(TPlan container) {
-    TPlanNode msg = new TPlanNode();
-    msg.node_id = id_.asInt();
-    msg.limit = limit_;
-
-    TExecStats estimatedStats = new TExecStats();
-    estimatedStats.setCardinality(cardinality_);
-    estimatedStats.setMemory_used(perHostMemCost_);
-    msg.setLabel(getDisplayLabel());
-    msg.setLabel_detail(getDisplayLabelDetail());
-    msg.setEstimated_stats(estimatedStats);
-
-    Preconditions.checkState(tupleIds_.size() > 0);
-    msg.setRow_tuples(Lists.<Integer>newArrayListWithCapacity(tupleIds_.size()));
-    msg.setNullable_tuples(Lists.<Boolean>newArrayListWithCapacity(tupleIds_.size()));
-    for (TupleId tid: tupleIds_) {
-      msg.addToRow_tuples(tid.asInt());
-      msg.addToNullable_tuples(nullableTupleIds_.contains(tid));
-    }
-    for (Expr e: conjuncts_) {
-      msg.addToConjuncts(e.treeToThrift());
-    }
-    // Serialize any runtime filters
-    for (RuntimeFilter filter: runtimeFilters_) {
-      msg.addToRuntime_filters(filter.toThrift());
-    }
-    toThrift(msg);
-    container.addToNodes(msg);
-    // For the purpose of the BE consider ExchangeNodes to have no children.
-    if (this instanceof ExchangeNode) {
-      msg.num_children = 0;
-      return;
-    } else {
-      msg.num_children = children_.size();
-      for (PlanNode child: children_) {
-        child.treeToThriftHelper(container);
-      }
-    }
-  }
-
-  /**
-   * Computes the full internal state, including smap and planner-relevant statistics
-   * (calls computeStats()), marks all slots referenced by this node as materialized
-   * and computes the mem layout of all materialized tuples (with the assumption that
-   * slots that are needed by ancestor PlanNodes have already been marked).
-   * Also performs final expr substitution with childrens' smaps and computes internal
-   * state required for toThrift(). This is called directly after construction.
-   * Throws if an expr substitution or evaluation fails.
-   */
-  public void init(Analyzer analyzer) throws ImpalaException {
-    assignConjuncts(analyzer);
-    computeStats(analyzer);
-    createDefaultSmap(analyzer);
-  }
-
-  /**
-   * Assign remaining unassigned conjuncts.
-   */
-  protected void assignConjuncts(Analyzer analyzer) {
-    List<Expr> unassigned = analyzer.getUnassignedConjuncts(this);
-    conjuncts_.addAll(unassigned);
-    analyzer.markConjunctsAssigned(unassigned);
-  }
-
-  /**
-   * Returns an smap that combines the childrens' smaps.
-   */
-  protected ExprSubstitutionMap getCombinedChildSmap() {
-    if (getChildren().size() == 0) return new ExprSubstitutionMap();
-    if (getChildren().size() == 1) return getChild(0).getOutputSmap();
-    ExprSubstitutionMap result = ExprSubstitutionMap.combine(
-        getChild(0).getOutputSmap(), getChild(1).getOutputSmap());
-    for (int i = 2; i < getChildren().size(); ++i) {
-      result = ExprSubstitutionMap.combine(result, getChild(i).getOutputSmap());
-    }
-    return result;
-  }
-
-  /**
-   * Sets outputSmap_ to compose(existing smap, combined child smap). Also
-   * substitutes conjuncts_ using the combined child smap.
-   */
-  protected void createDefaultSmap(Analyzer analyzer) {
-    ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
-    outputSmap_ =
-        ExprSubstitutionMap.compose(outputSmap_, combinedChildSmap, analyzer);
-    conjuncts_ = Expr.substituteList(conjuncts_, outputSmap_, analyzer, false);
-  }
-
-  /**
-   * Computes planner statistics: avgRowSize_, numNodes_, cardinality_.
-   * Subclasses need to override this.
-   * Assumes that it has already been called on all children.
-   * and that DescriptorTable.computePhysMemLayout() has been called.
-   * This is broken out of init() so that it can be called separately
-   * from init() (to facilitate inserting additional nodes during plan
-   * partitioning w/o the need to call init() recursively on the whole tree again).
-   */
-  protected void computeStats(Analyzer analyzer) {
-    avgRowSize_ = 0.0F;
-    for (TupleId tid: tupleIds_) {
-      TupleDescriptor desc = analyzer.getTupleDesc(tid);
-      avgRowSize_ += desc.getAvgSerializedSize();
-    }
-    if (!children_.isEmpty()) numNodes_ = getChild(0).numNodes_;
-  }
-
-  protected long capAtLimit(long cardinality) {
-    if (hasLimit()) {
-      if (cardinality == -1) {
-        return limit_;
-      } else {
-        return Math.min(cardinality, limit_);
-      }
-    }
-    return cardinality;
-  }
-
-  /**
-   * Call computeMemLayout() for all materialized tuples.
-   */
-  protected void computeMemLayout(Analyzer analyzer) {
-    for (TupleId id: tupleIds_) {
-      analyzer.getDescTbl().getTupleDesc(id).computeMemLayout();
-    }
-  }
-
-  /**
-   * Returns the estimated combined selectivity of all conjuncts. Uses heuristics to
-   * address the following estimation challenges:
-   * 1. The individual selectivities of conjuncts may be unknown.
-   * 2. Two selectivities, whether known or unknown, could be correlated. Assuming
-   *    independence can lead to significant underestimation.
-   *
-   * The first issue is addressed by using a single default selectivity that is
-   * representative of all conjuncts with unknown selectivities.
-   * The second issue is addressed by an exponential backoff when multiplying each
-   * additional selectivity into the final result.
-   */
-  static protected double computeCombinedSelectivity(List<Expr> conjuncts) {
-    // Collect all estimated selectivities.
-    List<Double> selectivities = Lists.newArrayList();
-    for (Expr e: conjuncts) {
-      if (e.hasSelectivity()) selectivities.add(e.getSelectivity());
-    }
-    if (selectivities.size() != conjuncts.size()) {
-      // Some conjuncts have no estimated selectivity. Use a single default
-      // representative selectivity for all those conjuncts.
-      selectivities.add(Expr.DEFAULT_SELECTIVITY);
-    }
-    // Sort the selectivities to get a consistent estimate, regardless of the original
-    // conjunct order. Sort in ascending order such that the most selective conjunct
-    // is fully applied.
-    Collections.sort(selectivities);
-    double result = 1.0;
-    for (int i = 0; i < selectivities.size(); ++i) {
-      // Exponential backoff for each selectivity multiplied into the final result.
-      result *= Math.pow(selectivities.get(i), 1.0 / (double) (i + 1));
-    }
-    // Bound result in [0, 1]
-    return Math.max(0.0, Math.min(1.0, result));
-  }
-
-  protected double computeSelectivity() {
-    return computeCombinedSelectivity(conjuncts_);
-  }
-
-  // Convert this plan node into msg (excluding children), which requires setting
-  // the node type and the node-specific field.
-  protected abstract void toThrift(TPlanNode msg);
-
-  protected String debugString() {
-    // not using Objects.toStrHelper because
-    // PlanNode.debugString() is embedded by debug strings of the subclasses
-    StringBuilder output = new StringBuilder();
-    output.append("preds=" + Expr.debugString(conjuncts_));
-    output.append(" limit=" + Long.toString(limit_));
-    return output.toString();
-  }
-
-  protected String getExplainString(List<? extends Expr> exprs) {
-    if (exprs == null) return "";
-    StringBuilder output = new StringBuilder();
-    for (int i = 0; i < exprs.size(); ++i) {
-      if (i > 0) output.append(", ");
-      output.append(exprs.get(i).toSql());
-    }
-    return output.toString();
-  }
-
-  /**
-   * Returns true if stats-related variables are valid.
-   */
-  protected boolean hasValidStats() {
-    return (numNodes_ == -1 || numNodes_ >= 0) &&
-           (cardinality_ == -1 || cardinality_ >= 0);
-  }
-
-  /**
-   * Computes and returns the sum of two cardinalities. If an overflow occurs,
-   * the maximum Long value is returned (Long.MAX_VALUE).
-   */
-  public static long addCardinalities(long a, long b) {
-    try {
-      return LongMath.checkedAdd(a, b);
-    } catch (ArithmeticException e) {
-      LOG.warn("overflow when adding cardinalities: " + a + ", " + b);
-      return Long.MAX_VALUE;
-    }
-  }
-
-  /**
-   * Computes and returns the product of two cardinalities. If an overflow
-   * occurs, the maximum Long value is returned (Long.MAX_VALUE).
-   */
-  public static long multiplyCardinalities(long a, long b) {
-    try {
-      return LongMath.checkedMultiply(a, b);
-    } catch (ArithmeticException e) {
-      LOG.warn("overflow when multiplying cardinalities: " + a + ", " + b);
-      return Long.MAX_VALUE;
-    }
-  }
-
-  /**
-   * Returns true if this plan node can output its first row only after consuming
-   * all rows of all its children. This method is used to group plan nodes
-   * into pipelined units for resource estimation.
-   */
-  public boolean isBlockingNode() { return false; }
-
-  /**
-   * Estimates the cost of executing this PlanNode. Currently only sets perHostMemCost_.
-   * May only be called after this PlanNode has been placed in a PlanFragment because
-   * the cost computation is dependent on the enclosing fragment's data partition.
-   */
-  public void computeCosts(TQueryOptions queryOptions) {
-    perHostMemCost_ = 0;
-  }
-
-  /**
-   * The input cardinality is the sum of output cardinalities of its children.
-   * For scan nodes the input cardinality is the expected number of rows scanned.
-   */
-  public long getInputCardinality() {
-    long sum = 0;
-    for(PlanNode p : children_) {
-      long tmp = p.getCardinality();
-      if (tmp == -1) return -1;
-      sum = addCardinalities(sum, tmp);
-    }
-    return sum;
-  }
-
-  protected void addRuntimeFilter(RuntimeFilter filter) { runtimeFilters_.add(filter); }
-
-  protected Collection<RuntimeFilter> getRuntimeFilters() { return runtimeFilters_; }
-
-  protected String getRuntimeFilterExplainString(boolean isBuildNode) {
-    if (runtimeFilters_.isEmpty()) return "";
-    final String applyNodeFilterFormat = "%s -> %s";
-    final String buildNodeFilterFormat = "%s <- %s";
-    String format = isBuildNode ? buildNodeFilterFormat : applyNodeFilterFormat;
-    StringBuilder output = new StringBuilder();
-    List<String> filtersStr = Lists.newArrayList();
-    for (RuntimeFilter filter: runtimeFilters_) {
-      Expr expr = null;
-      if (isBuildNode) {
-        expr = filter.getSrcExpr();
-      } else {
-        expr = filter.getTargetExpr(getId());
-      }
-      Preconditions.checkNotNull(expr);
-      filtersStr.add(String.format(format, filter.getFilterId(), expr.toSql()));
-    }
-    output.append(Joiner.on(", ").join(filtersStr) + "\n");
-    return output.toString();
-  }
-
-  /**
-   * Sort a list of conjuncts into an estimated cheapest order to evaluate them in, based
-   * on estimates of the cost to evaluate and selectivity of the expressions. Should be
-   * called during PlanNode.init for any PlanNode that could have a conjunct list.
-   *
-   * The conjuncts are sorted by repeatedly iterating over them and choosing the conjunct
-   * that would result in the least total estimated work were it to be applied before the
-   * remaining conjuncts.
-   *
-   * As in computeCombinedSelecivity, the selectivities are exponentially backed off over
-   * the iterations, to reflect the possibility that the conjuncts may be correlated, and
-   * Exprs without selectivity estimates are given a reasonable default.
-   */
-  public static <T extends Expr> List<T> orderConjunctsByCost(List<T> conjuncts) {
-    if (conjuncts.size() <= 1) return conjuncts;
-
-    float totalCost = 0;
-    int numWithoutSel = 0;
-    List<T> remaining = Lists.newArrayListWithCapacity(conjuncts.size());
-    for (T e : conjuncts) {
-      Preconditions.checkState(e.hasCost());
-      totalCost += e.getCost();
-      remaining.add(e);
-      if (!e.hasSelectivity()) {
-        ++numWithoutSel;
-      }
-    }
-
-    // We distribute the DEFAULT_SELECTIVITY over the conjuncts without a selectivity
-    // estimate so that their combined selectivities equal DEFAULT_SELECTIVITY, i.e.
-    // Math.pow(defaultSel, numWithoutSel) = Expr.DEFAULT_SELECTIVITY
-    double defaultSel = Expr.DEFAULT_SELECTIVITY;
-    if (numWithoutSel != 0) {
-      defaultSel = Math.pow(Math.E, Math.log(Expr.DEFAULT_SELECTIVITY) / numWithoutSel);
-    }
-
-    List<T> sortedConjuncts = Lists.newArrayListWithCapacity(conjuncts.size());
-    while (!remaining.isEmpty()) {
-      double smallestCost = Float.MAX_VALUE;
-      T bestConjunct =  null;
-      double backoffExp = 1.0 / (double) (sortedConjuncts.size() + 1);
-      for (T e : remaining) {
-        double sel = Math.pow(e.hasSelectivity() ? e.getSelectivity() : defaultSel,
-            backoffExp);
-
-        // The cost of evaluating this conjunct first is estimated as the cost of
-        // applying this conjunct to all rows plus the cost of applying all the
-        // remaining conjuncts to the number of rows we expect to remain given
-        // this conjunct's selectivity, exponentially backed off.
-        double cost = e.getCost() + (totalCost - e.getCost()) * sel;
-        if (cost < smallestCost) {
-          smallestCost = cost;
-          bestConjunct = e;
-        }
-      }
-
-      sortedConjuncts.add(bestConjunct);
-      remaining.remove(bestConjunct);
-      totalCost -= bestConjunct.getCost();
-    }
-
-    return sortedConjuncts;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java
deleted file mode 100644
index d161e2b..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNodeId.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class PlanNodeId extends Id<PlanNodeId> {
-  // Construction only allowed via an IdGenerator.
-  protected PlanNodeId(int id) {
-    super(id);
-  }
-
-  public static IdGenerator<PlanNodeId> createGenerator() {
-    return new IdGenerator<PlanNodeId>() {
-      @Override
-      public PlanNodeId getNextId() { return new PlanNodeId(nextId_++); }
-      @Override
-      public PlanNodeId getMaxId() { return new PlanNodeId(nextId_ - 1); }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%02d", id_);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
deleted file mode 100644
index df90df3..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java
+++ /dev/null
@@ -1,456 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AnalysisContext;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.ColumnLineageGraph;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.analysis.InsertStmt;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.analysis.QueryStmt;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.common.RuntimeEnv;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TQueryExecRequest;
-import com.cloudera.impala.thrift.TRuntimeFilterMode;
-import com.cloudera.impala.thrift.TTableName;
-import com.cloudera.impala.util.MaxRowsProcessedVisitor;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Creates an executable plan from an analyzed parse tree and query options.
- */
-public class Planner {
-  private final static Logger LOG = LoggerFactory.getLogger(Planner.class);
-
-  private final PlannerContext ctx_;
-
-  public Planner(AnalysisContext.AnalysisResult analysisResult, TQueryCtx queryCtx) {
-    ctx_ = new PlannerContext(analysisResult, queryCtx);
-  }
-
-  /**
-   * Returns a list of plan fragments for executing an analyzed parse tree.
-   * May return a single-node or distributed executable plan. If enabled (through a
-   * query option), computes runtime filters for dynamic partition pruning.
-   *
-   * Plan generation may fail and throw for the following reasons:
-   * 1. Expr evaluation failed, e.g., during partition pruning.
-   * 2. A certain feature is not yet implemented, e.g., physical join implementation for
-   *    outer/semi joins without equi conjuncts.
-   * 3. Expr substitution failed, e.g., because an expr was substituted with a type that
-   *    render the containing expr semantically invalid. Analysis should have ensured
-   *    that such an expr substitution during plan generation never fails. If it does,
-   *    that typically means there is a bug in analysis, or a broken/missing smap.
-   */
-  public ArrayList<PlanFragment> createPlan() throws ImpalaException {
-    SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
-    DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
-    PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
-    ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created");
-    ArrayList<PlanFragment> fragments = null;
-
-    // Determine the maximum number of rows processed by any node in the plan tree
-    MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
-    singleNodePlan.accept(visitor);
-    long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get();
-    boolean isSmallQuery =
-        maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
-    if (isSmallQuery) {
-      // Execute on a single node and disable codegen for small results
-      ctx_.getQueryOptions().setNum_nodes(1);
-      ctx_.getQueryOptions().setDisable_codegen(true);
-      if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
-          maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
-        // Only one scanner thread for small queries
-        ctx_.getQueryOptions().setNum_scanner_threads(1);
-      }
-      // disable runtime filters
-      ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
-    }
-
-    // Join rewrites.
-    invertJoins(singleNodePlan, ctx_.isSingleNodeExec());
-    singleNodePlan = useNljForSingularRowBuilds(singleNodePlan, ctx_.getRootAnalyzer());
-
-    // create runtime filters
-    if (ctx_.getQueryOptions().getRuntime_filter_mode() != TRuntimeFilterMode.OFF) {
-      // Always compute filters, even if the BE won't always use all of them.
-      RuntimeFilterGenerator.generateRuntimeFilters(ctx_.getRootAnalyzer(),
-          singleNodePlan, ctx_.getQueryOptions().getMax_num_runtime_filters());
-      ctx_.getRootAnalyzer().getTimeline().markEvent(
-          "Runtime filters computed");
-    }
-
-    if (ctx_.isSingleNodeExec()) {
-      // create one fragment containing the entire single-node plan tree
-      fragments = Lists.newArrayList(new PlanFragment(
-          ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
-    } else {
-      singleNodePlanner.validatePlan(singleNodePlan);
-      // create distributed plan
-      fragments = distributedPlanner.createPlanFragments(singleNodePlan);
-    }
-
-    PlanFragment rootFragment = fragments.get(fragments.size() - 1);
-    rootFragment.verifyTree();
-    ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
-    List<Expr> resultExprs = null;
-    if (ctx_.isInsertOrCtas()) {
-      InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
-      insertStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
-      if (!ctx_.isSingleNodeExec()) {
-        // repartition on partition keys
-        rootFragment = distributedPlanner.createInsertFragment(
-            rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
-      }
-      // set up table sink for root fragment
-      rootFragment.setSink(insertStmt.createDataSink());
-      resultExprs = insertStmt.getResultExprs();
-    } else {
-      if (ctx_.isUpdate()) {
-        // Set up update sink for root fragment
-        rootFragment.setSink(ctx_.getAnalysisResult().getUpdateStmt().createDataSink());
-      } else if (ctx_.isDelete()) {
-        // Set up delete sink for root fragment
-        rootFragment.setSink(ctx_.getAnalysisResult().getDeleteStmt().createDataSink());
-      }
-      QueryStmt queryStmt = ctx_.getQueryStmt();
-      queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());
-      resultExprs = queryStmt.getResultExprs();
-    }
-    rootFragment.setOutputExprs(resultExprs);
-
-    LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
-    LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
-    LOG.debug("finalize plan fragments");
-    for (PlanFragment fragment: fragments) {
-      fragment.finalize(ctx_.getRootAnalyzer());
-    }
-
-    Collections.reverse(fragments);
-    ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created");
-
-    ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
-    if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
-      // Compute the column lineage graph
-      if (ctx_.isInsertOrCtas()) {
-        Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable();
-        graph.addTargetColumnLabels(targetTable);
-        Preconditions.checkNotNull(targetTable);
-        List<Expr> exprs = Lists.newArrayList();
-        if (targetTable instanceof HBaseTable) {
-          exprs.addAll(resultExprs);
-        } else {
-          exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
-          exprs.addAll(resultExprs.subList(0,
-              targetTable.getNonClusteringColumns().size()));
-        }
-        graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
-      } else {
-        graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
-        graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
-      }
-      LOG.trace("lineage: " + graph.debugString());
-      ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed");
-    }
-
-    return fragments;
-  }
-
-  /**
-   * Return a list of plans, each represented by the root of their fragment trees.
-   * TODO: roll into createPlan()
-   */
-  public List<PlanFragment> createParallelPlans() throws ImpalaException {
-    ArrayList<PlanFragment> distrPlan = createPlan();
-    Preconditions.checkNotNull(distrPlan);
-    ParallelPlanner planner = new ParallelPlanner(ctx_);
-    List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
-    ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created");
-    return parallelPlans;
-  }
-
-  /**
-   * Return combined explain string for all plan fragments.
-   * Includes the estimated resource requirements from the request if set.
-   */
-  public String getExplainString(ArrayList<PlanFragment> fragments,
-      TQueryExecRequest request, TExplainLevel explainLevel) {
-    StringBuilder str = new StringBuilder();
-    boolean hasHeader = false;
-    if (request.isSetPer_host_mem_req() && request.isSetPer_host_vcores()) {
-      str.append(
-          String.format("Estimated Per-Host Requirements: Memory=%s VCores=%s\n",
-          PrintUtils.printBytes(request.getPer_host_mem_req()),
-          request.per_host_vcores));
-      hasHeader = true;
-    }
-
-    // IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except
-    // child queries of 'compute stats'.
-    if (!request.query_ctx.isSetParent_query_id() &&
-        request.query_ctx.isSetTables_with_corrupt_stats() &&
-        !request.query_ctx.getTables_with_corrupt_stats().isEmpty()) {
-      List<String> tableNames = Lists.newArrayList();
-      for (TTableName tableName: request.query_ctx.getTables_with_corrupt_stats()) {
-        tableNames.add(tableName.db_name + "." + tableName.table_name);
-      }
-      str.append(
-          "WARNING: The following tables have potentially corrupt table statistics.\n" +
-          "Drop and re-compute statistics to resolve this problem.\n" +
-          Joiner.on(", ").join(tableNames) + "\n");
-      hasHeader = true;
-    }
-
-    // Append warning about tables missing stats except for child queries of
-    // 'compute stats'. The parent_query_id is only set for compute stats child queries.
-    if (!request.query_ctx.isSetParent_query_id() &&
-        request.query_ctx.isSetTables_missing_stats() &&
-        !request.query_ctx.getTables_missing_stats().isEmpty()) {
-      List<String> tableNames = Lists.newArrayList();
-      for (TTableName tableName: request.query_ctx.getTables_missing_stats()) {
-        tableNames.add(tableName.db_name + "." + tableName.table_name);
-      }
-      str.append("WARNING: The following tables are missing relevant table " +
-          "and/or column statistics.\n" + Joiner.on(", ").join(tableNames) + "\n");
-      hasHeader = true;
-    }
-
-    if (request.query_ctx.isDisable_spilling()) {
-      str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" +
-          "Reason: Query option disable_unsafe_spills is set, at least one table\n" +
-          "is missing relevant stats, and no plan hints were given.\n");
-      hasHeader = true;
-    }
-    if (hasHeader) str.append("\n");
-
-    if (explainLevel.ordinal() < TExplainLevel.VERBOSE.ordinal()) {
-      // Print the non-fragmented parallel plan.
-      str.append(fragments.get(0).getExplainString(explainLevel));
-    } else {
-      // Print the fragmented parallel plan.
-      for (int i = 0; i < fragments.size(); ++i) {
-        PlanFragment fragment = fragments.get(i);
-        str.append(fragment.getExplainString(explainLevel));
-        if (explainLevel == TExplainLevel.VERBOSE && i + 1 != fragments.size()) {
-          str.append("\n");
-        }
-      }
-    }
-    return str.toString();
-  }
-
-  /**
-   * Returns true if the fragments are for a trivial, coordinator-only query:
-   * Case 1: Only an EmptySetNode, e.g. query has a limit 0.
-   * Case 2: Query has only constant exprs.
-   */
-  private static boolean isTrivialCoordOnlyPlan(List<PlanFragment> fragments) {
-    Preconditions.checkNotNull(fragments);
-    Preconditions.checkState(!fragments.isEmpty());
-    if (fragments.size() > 1) return false;
-    PlanNode root = fragments.get(0).getPlanRoot();
-    if (root instanceof EmptySetNode) return true;
-    if (root instanceof UnionNode && ((UnionNode) root).isConstantUnion()) return true;
-    return false;
-  }
-
-  /**
-   * Estimates the per-host memory and CPU requirements for the given plan fragments,
-   * and sets the results in request.
-   * Optionally excludes the requirements for unpartitioned fragments.
-   * TODO: The LOG.warn() messages should eventually become Preconditions checks
-   * once resource estimation is more robust.
-   * TODO: Revisit and possibly remove during MT work, particularly references to vcores.
-   */
-  public void computeResourceReqs(List<PlanFragment> fragments,
-      boolean excludeUnpartitionedFragments,
-      TQueryExecRequest request) {
-    Preconditions.checkState(!fragments.isEmpty());
-    Preconditions.checkNotNull(request);
-
-    // Compute pipelined plan node sets.
-    ArrayList<PipelinedPlanNodeSet> planNodeSets =
-        PipelinedPlanNodeSet.computePlanNodeSets(fragments.get(0).getPlanRoot());
-
-    // Compute the max of the per-host mem and vcores requirement.
-    // Note that the max mem and vcores may come from different plan node sets.
-    long maxPerHostMem = Long.MIN_VALUE;
-    int maxPerHostVcores = Integer.MIN_VALUE;
-    for (PipelinedPlanNodeSet planNodeSet: planNodeSets) {
-      if (!planNodeSet.computeResourceEstimates(
-          excludeUnpartitionedFragments, ctx_.getQueryOptions())) {
-        continue;
-      }
-      long perHostMem = planNodeSet.getPerHostMem();
-      int perHostVcores = planNodeSet.getPerHostVcores();
-      if (perHostMem > maxPerHostMem) maxPerHostMem = perHostMem;
-      if (perHostVcores > maxPerHostVcores) maxPerHostVcores = perHostVcores;
-    }
-
-    // Do not ask for more cores than are in the RuntimeEnv.
-    maxPerHostVcores = Math.min(maxPerHostVcores, RuntimeEnv.INSTANCE.getNumCores());
-
-    // Special case for some trivial coordinator-only queries (IMPALA-3053, IMPALA-1092).
-    if (isTrivialCoordOnlyPlan(fragments)) {
-      maxPerHostMem = 1024;
-      maxPerHostVcores = 1;
-    }
-
-    // Set costs to zero if there are only unpartitioned fragments and
-    // excludeUnpartitionedFragments is true.
-    // TODO: handle this case with a better indication for unknown, e.g. -1 or not set.
-    if (maxPerHostMem == Long.MIN_VALUE || maxPerHostVcores == Integer.MIN_VALUE) {
-      boolean allUnpartitioned = true;
-      for (PlanFragment fragment: fragments) {
-        if (fragment.isPartitioned()) {
-          allUnpartitioned = false;
-          break;
-        }
-      }
-      if (allUnpartitioned && excludeUnpartitionedFragments) {
-        maxPerHostMem = 0;
-        maxPerHostVcores = 0;
-      }
-    }
-
-    if (maxPerHostMem < 0 || maxPerHostMem == Long.MIN_VALUE) {
-      LOG.warn("Invalid per-host memory requirement: " + maxPerHostMem);
-    }
-    if (maxPerHostVcores < 0 || maxPerHostVcores == Integer.MIN_VALUE) {
-      LOG.warn("Invalid per-host virtual cores requirement: " + maxPerHostVcores);
-    }
-    request.setPer_host_mem_req(maxPerHostMem);
-    request.setPer_host_vcores((short) maxPerHostVcores);
-
-    LOG.debug("Estimated per-host peak memory requirement: " + maxPerHostMem);
-    LOG.debug("Estimated per-host virtual cores requirement: " + maxPerHostVcores);
-  }
-
-  /**
-   * Traverses the plan tree rooted at 'root' and inverts outer and semi joins
-   * in the following situations:
-   * 1. If the left-hand side is a SingularRowSrcNode then we invert the join because
-   *    then the build side is guaranteed to have only a single row.
-   * 2. There is no backend support for distributed non-equi right outer/semi joins,
-   *    so we invert them (any distributed left semi/outer join is ok).
-   * 3. Invert semi/outer joins if the right-hand size is estimated to have a higher
-   *    cardinality*avgSerializedSize. Do not invert if relevant stats are missing.
-   * The first two inversion rules are independent of the presence/absence of stats.
-   * Left Null Aware Anti Joins are never inverted due to lack of backend support.
-   * Joins that originate from query blocks with a straight join hint are not inverted.
-   * The 'isLocalPlan' parameter indicates whether the plan tree rooted at 'root'
-   * will be executed locally within one machine, i.e., without any data exchanges.
-   */
-  private void invertJoins(PlanNode root, boolean isLocalPlan) {
-    if (root instanceof SubplanNode) {
-      invertJoins(root.getChild(0), isLocalPlan);
-      invertJoins(root.getChild(1), true);
-    } else {
-      for (PlanNode child: root.getChildren()) invertJoins(child, isLocalPlan);
-    }
-
-    if (root instanceof JoinNode) {
-      JoinNode joinNode = (JoinNode) root;
-      JoinOperator joinOp = joinNode.getJoinOp();
-
-      // 1. No inversion allowed due to straight join.
-      // 2. The null-aware left anti-join operator is not considered for inversion.
-      //    There is no backend support for a null-aware right anti-join because
-      //    we cannot execute it efficiently.
-      if (joinNode.isStraightJoin() || joinOp.isNullAwareLeftAntiJoin()) {
-        // Re-compute tuple ids since their order must correspond to the order of children.
-        root.computeTupleIds();
-        return;
-      }
-
-      if (joinNode.getChild(0) instanceof SingularRowSrcNode) {
-        // Always place a singular row src on the build side because it
-        // only produces a single row.
-        joinNode.invertJoin();
-      } else if (!isLocalPlan && joinNode instanceof NestedLoopJoinNode &&
-          (joinOp.isRightSemiJoin() || joinOp.isRightOuterJoin())) {
-        // The current join is a distributed non-equi right outer or semi join
-        // which has no backend support. Invert the join to make it executable.
-        joinNode.invertJoin();
-      } else {
-        // Invert the join if doing so reduces the size of the materialized rhs
-        // (may also reduce network costs depending on the join strategy).
-        // Only consider this optimization if both the lhs/rhs cardinalities are known.
-        long lhsCard = joinNode.getChild(0).getCardinality();
-        long rhsCard = joinNode.getChild(1).getCardinality();
-        float lhsAvgRowSize = joinNode.getChild(0).getAvgRowSize();
-        float rhsAvgRowSize = joinNode.getChild(1).getAvgRowSize();
-        if (lhsCard != -1 && rhsCard != -1 &&
-            lhsCard * lhsAvgRowSize < rhsCard * rhsAvgRowSize) {
-          joinNode.invertJoin();
-        }
-      }
-    }
-
-    // Re-compute tuple ids because the backend assumes that their order corresponds to
-    // the order of children.
-    root.computeTupleIds();
-  }
-
-  /**
-   * Converts hash joins to nested-loop joins if the right-side is a SingularRowSrcNode.
-   * Does not convert Null Aware Anti Joins because we only support that join op with
-   * a hash join.
-   * Throws if JoinNode.init() fails on the new nested-loop join node.
-   */
-  private PlanNode useNljForSingularRowBuilds(PlanNode root, Analyzer analyzer)
-      throws ImpalaException {
-    for (int i = 0; i < root.getChildren().size(); ++i) {
-      root.setChild(i, useNljForSingularRowBuilds(root.getChild(i), analyzer));
-    }
-    if (!(root instanceof JoinNode)) return root;
-    if (root instanceof NestedLoopJoinNode) return root;
-    if (!(root.getChild(1) instanceof SingularRowSrcNode)) return root;
-    JoinNode joinNode = (JoinNode) root;
-    if (joinNode.getJoinOp().isNullAwareLeftAntiJoin()) {
-      Preconditions.checkState(joinNode instanceof HashJoinNode);
-      return root;
-    }
-    List<Expr> otherJoinConjuncts = Lists.newArrayList(joinNode.getOtherJoinConjuncts());
-    otherJoinConjuncts.addAll(joinNode.getEqJoinConjuncts());
-    JoinNode newJoinNode = new NestedLoopJoinNode(joinNode.getChild(0),
-        joinNode.getChild(1), joinNode.isStraightJoin(),
-        joinNode.getDistributionModeHint(), joinNode.getJoinOp(), otherJoinConjuncts);
-    newJoinNode.getConjuncts().addAll(joinNode.getConjuncts());
-    newJoinNode.setId(joinNode.getId());
-    newJoinNode.init(analyzer);
-    return newJoinNode;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java b/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java
deleted file mode 100644
index fc11287..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PlannerContext.java
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.LinkedList;
-
-import com.cloudera.impala.analysis.AnalysisContext;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.QueryStmt;
-import com.cloudera.impala.common.IdGenerator;
-import com.cloudera.impala.thrift.TQueryCtx;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.collect.Lists;
-
-/**
- * Contains the analysis result of a query as well as planning-specific
- * parameters and state such as plan-node and plan-fragment id generators.
- */
-public class PlannerContext {
-  // Estimate of the overhead imposed by storing data in a hash tbl;
-  // used for determining whether a broadcast join is feasible.
-  public final static double HASH_TBL_SPACE_OVERHEAD = 1.1;
-
-  // The maximum fraction of remaining memory that a sort node can use during execution.
-  public final static double SORT_MEM_MAX_FRACTION = 0.80;
-
-  // Assumed average number of items in a nested collection, since we currently have no
-  // statistics on nested fields. The motivation for this constant is to avoid
-  // pathological plan choices that could result from a SubplanNode having an unknown
-  // cardinality (due to UnnestNodes not knowing their cardinality), or from a ScanNode
-  // significantly underestimating its output cardinality because intermediate collections
-  // are not accounted for at all. For example, we will place a table ref plan with a
-  // SubplanNode on the build side of a join due to an unknown cardinality if the other
-  // input is a base table scan with stats.
-  // The constant value was chosen arbitrarily to not be "too high" or "too low".
-  // TODO: Compute stats for nested types and pick them up here.
-  public static final long AVG_COLLECTION_SIZE = 10;
-
-  private final IdGenerator<PlanNodeId> nodeIdGenerator_ = PlanNodeId.createGenerator();
-  private final IdGenerator<PlanFragmentId> fragmentIdGenerator_ =
-      PlanFragmentId.createGenerator();
-
-  // Keeps track of subplan nesting. Maintained with push/popSubplan().
-  private final LinkedList<SubplanNode> subplans_ = Lists.newLinkedList();
-
-  private final TQueryCtx queryCtx_;
-  private final AnalysisContext.AnalysisResult analysisResult_;
-  private final QueryStmt queryStmt_;
-
-  public PlannerContext (AnalysisContext.AnalysisResult analysisResult,
-      TQueryCtx queryCtx) {
-    analysisResult_ = analysisResult;
-    queryCtx_ = queryCtx;
-    if (isInsertOrCtas()) {
-      queryStmt_ = analysisResult.getInsertStmt().getQueryStmt();
-    } else if (analysisResult.isUpdateStmt()) {
-      queryStmt_ = analysisResult.getUpdateStmt().getQueryStmt();
-    } else if (analysisResult.isDeleteStmt()) {
-      queryStmt_ = analysisResult.getDeleteStmt().getQueryStmt();
-    } else {
-      queryStmt_ = analysisResult.getQueryStmt();
-    }
-  }
-
-  public QueryStmt getQueryStmt() { return queryStmt_; }
-  public TQueryCtx getQueryCtx() { return queryCtx_; }
-  public TQueryOptions getQueryOptions() {
-    return queryCtx_.getRequest().getQuery_options();
-  }
-  public AnalysisContext.AnalysisResult getAnalysisResult() { return analysisResult_; }
-  public Analyzer getRootAnalyzer() { return analysisResult_.getAnalyzer(); }
-  public boolean isSingleNodeExec() { return getQueryOptions().num_nodes == 1; }
-  public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); }
-  public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); }
-  public boolean isInsertOrCtas() {
-    return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
-  }
-
-  public boolean hasSubplan() { return !subplans_.isEmpty(); }
-  public SubplanNode getSubplan() { return subplans_.getFirst(); }
-  public boolean pushSubplan(SubplanNode n) { return subplans_.offerFirst(n); }
-  public void popSubplan() { subplans_.removeFirst(); }
-  public boolean isUpdate() { return analysisResult_.isUpdateStmt(); }
-  public boolean isDelete() { return analysisResult_.isDeleteStmt(); }
-}