You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:37 UTC

[20/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java
deleted file mode 100644
index 7b97773..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/HdfsTableSink.java
+++ /dev/null
@@ -1,157 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.catalog.HdfsFileFormat;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TDataSinkType;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.THdfsTableSink;
-import com.cloudera.impala.thrift.TTableSink;
-import com.cloudera.impala.thrift.TTableSinkType;
-import com.google.common.base.Preconditions;
-
-/**
- * Base class for Hdfs data sinks such as HdfsTextTableSink.
- *
- */
-public class HdfsTableSink extends TableSink {
-  // Default number of partitions used for computeCosts() in the absence of column stats.
-  protected final long DEFAULT_NUM_PARTITIONS = 10;
-
-  // Exprs for computing the output partition(s).
-  protected final List<Expr> partitionKeyExprs_;
-  // Whether to overwrite the existing partition(s).
-  protected final boolean overwrite_;
-
-  public HdfsTableSink(Table targetTable, List<Expr> partitionKeyExprs,
-      boolean overwrite) {
-    super(targetTable, Op.INSERT);
-    Preconditions.checkState(targetTable instanceof HdfsTable);
-    partitionKeyExprs_ = partitionKeyExprs;
-    overwrite_ = overwrite;
-  }
-
-  @Override
-  public void computeCosts() {
-    HdfsTable table = (HdfsTable) targetTable_;
-    // TODO: Estimate the memory requirements more accurately by partition type.
-    HdfsFileFormat format = table.getMajorityFormat();
-    PlanNode inputNode = fragment_.getPlanRoot();
-    int numNodes = fragment_.getNumNodes();
-    // Compute the per-host number of partitions, taking the number of nodes
-    // and the data partition of the fragment executing this sink into account.
-    long numPartitions = fragment_.getNumDistinctValues(partitionKeyExprs_);
-    if (numPartitions == -1) numPartitions = DEFAULT_NUM_PARTITIONS;
-    long perPartitionMemReq = getPerPartitionMemReq(format);
-
-    // The estimate is based purely on the per-partition mem req if the input cardinality_
-    // or the avg row size is unknown.
-    if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
-      perHostMemCost_ = numPartitions * perPartitionMemReq;
-      return;
-    }
-
-    // The per-partition estimate may be higher than the memory required to buffer
-    // the entire input data.
-    long perHostInputCardinality = Math.max(1L, inputNode.getCardinality() / numNodes);
-    long perHostInputBytes =
-        (long) Math.ceil(perHostInputCardinality * inputNode.getAvgRowSize());
-    perHostMemCost_ = Math.min(perHostInputBytes, numPartitions * perPartitionMemReq);
-  }
-
-  /**
-   * Returns the per-partition memory requirement for inserting into the given
-   * file format.
-   */
-  private long getPerPartitionMemReq(HdfsFileFormat format) {
-    switch (format) {
-      // Writing to a Parquet table requires up to 1GB of buffer per partition.
-      // TODO: The per-partition memory requirement is configurable in the QueryOptions.
-      case PARQUET: return 1024L * 1024L * 1024L;
-      case TEXT: return 100L * 1024L;
-      default:
-        Preconditions.checkState(false, "Unsupported TableSink format " +
-            format.toString());
-    }
-    return 0;
-  }
-
-  @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
-    String overwriteStr = ", OVERWRITE=" + (overwrite_ ? "true" : "false");
-    String partitionKeyStr = "";
-    if (!partitionKeyExprs_.isEmpty()) {
-      StringBuilder tmpBuilder = new StringBuilder(", PARTITION-KEYS=(");
-      for (Expr expr: partitionKeyExprs_) {
-        tmpBuilder.append(expr.toSql() + ",");
-      }
-      tmpBuilder.deleteCharAt(tmpBuilder.length() - 1);
-      tmpBuilder.append(")");
-      partitionKeyStr = tmpBuilder.toString();
-    }
-    output.append(String.format("%sWRITE TO HDFS [%s%s%s]\n", prefix,
-        targetTable_.getFullName(), overwriteStr, partitionKeyStr));
-    // Report the total number of partitions, independent of the number of nodes
-    // and the data partition of the fragment executing this sink.
-    if (explainLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      long totalNumPartitions = Expr.getNumDistinctValues(partitionKeyExprs_);
-      if (totalNumPartitions == -1) {
-        output.append(detailPrefix + "partitions=unavailable");
-      } else {
-        output.append(detailPrefix + "partitions="
-            + (totalNumPartitions == 0 ? 1 : totalNumPartitions));
-      }
-      output.append("\n");
-      if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-        output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes()));
-        output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-        output.append("\n");
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  protected TDataSink toThrift() {
-    TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
-    THdfsTableSink hdfsTableSink = new THdfsTableSink(
-        Expr.treesToThrift(partitionKeyExprs_), overwrite_);
-    HdfsTable table = (HdfsTable) targetTable_;
-    StringBuilder error = new StringBuilder();
-    int skipHeaderLineCount = table.parseSkipHeaderLineCount(error);
-    // Errors will be caught during analysis.
-    Preconditions.checkState(error.length() == 0);
-    if (skipHeaderLineCount > 0) {
-      hdfsTableSink.setSkip_header_line_count(skipHeaderLineCount);
-    }
-    TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(),
-        TTableSinkType.HDFS, sinkOp_.toThrift());
-    tTableSink.hdfs_table_sink = hdfsTableSink;
-    result.table_sink = tTableSink;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
deleted file mode 100644
index 25da277..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
+++ /dev/null
@@ -1,103 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TDataSinkType;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TJoinBuildSink;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Sink to materialize the build side of a join.
- */
-public class JoinBuildSink extends DataSink {
-  private final static Logger LOG = LoggerFactory.getLogger(JoinBuildSink.class);
-
-  // id of join's build-side table assigned during planning
-  private final JoinTableId joinTableId_;
-
-  private final List<Expr> buildExprs_ = Lists.newArrayList();
-
-  /**
-   * Creates sink for build side of 'joinNode' (extracts buildExprs_ from joinNode).
-   */
-  public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
-    Preconditions.checkState(joinTableId.isValid());
-    joinTableId_ = joinTableId;
-    Preconditions.checkNotNull(joinNode);
-    Preconditions.checkState(joinNode instanceof JoinNode);
-    if (!(joinNode instanceof HashJoinNode)) return;
-    for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) {
-      BinaryPredicate p = (BinaryPredicate) eqJoinConjunct;
-      // by convention the build exprs are the rhs of the join conjuncts
-      buildExprs_.add(p.getChild(1).clone());
-    }
-  }
-
-  public JoinTableId getJoinTableId() { return joinTableId_; }
-
-  @Override
-  protected TDataSink toThrift() {
-    TDataSink result = new TDataSink(TDataSinkType.JOIN_BUILD_SINK);
-    TJoinBuildSink tBuildSink = new TJoinBuildSink();
-    tBuildSink.setJoin_table_id(joinTableId_.asInt());
-    for (Expr buildExpr: buildExprs_) {
-      tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
-    }
-    result.setJoin_build_sink(tBuildSink);
-    return result;
-  }
-
-  @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(String.format("%s%s\n", prefix, "JOIN BUILD"));
-    if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      output.append(
-          detailPrefix + "join-table-id=" + joinTableId_.toString()
-            + " plan-id=" + fragment_.getPlanId().toString()
-            + " cohort-id=" + fragment_.getCohortId().toString() + "\n");
-      if (!buildExprs_.isEmpty()) {
-        output.append(detailPrefix + "build expressions: ")
-            .append(Expr.toSql(buildExprs_) + "\n");
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  public void computeCosts() {
-    // TODO: implement?
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
deleted file mode 100644
index ebc9b51..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
+++ /dev/null
@@ -1,508 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.catalog.ColumnStats;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.ImpalaException;
-import com.google.common.base.Preconditions;
-
-/**
- * Logical join operator. Subclasses correspond to implementations of the join operator
- * (e.g. hash join, nested-loop join, etc).
- */
-public abstract class JoinNode extends PlanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(JoinNode.class);
-
-  // Default per-host memory requirement used if no valid stats are available.
-  // TODO: Come up with a more useful heuristic (e.g., based on scanned partitions).
-  protected final static long DEFAULT_PER_HOST_MEM = 2L * 1024L * 1024L * 1024L;
-
-  // Slop in percent allowed when comparing stats for the purpose of determining whether
-  // an equi-join condition is a foreign/primary key join.
-  protected final static double FK_PK_MAX_STATS_DELTA_PERC = 0.05;
-
-  protected JoinOperator joinOp_;
-
-  // Indicates if this join originates from a query block with a straight join hint.
-  protected final boolean isStraightJoin_;
-
-  // User-provided hint for the distribution mode. Set to 'NONE' if no hints were given.
-  protected final DistributionMode distrModeHint_;
-
-  protected DistributionMode distrMode_ = DistributionMode.NONE;
-
-  // Join conjuncts. eqJoinConjuncts_ are conjuncts of the form <lhs> = <rhs>;
-  // otherJoinConjuncts_ are non-equi join conjuncts. For an inner join, join conjuncts
-  // are conjuncts from the ON, USING or WHERE clauses. For other join types (e.g. outer
-  // and semi joins) these include only conjuncts from the ON and USING clauses.
-  protected List<BinaryPredicate> eqJoinConjuncts_;
-  protected List<Expr> otherJoinConjuncts_;
-
-  // if valid, the rhs input is materialized outside of this node and is assigned
-  // joinTableId_
-  protected JoinTableId joinTableId_ = JoinTableId.INVALID;
-
-  public enum DistributionMode {
-    NONE("NONE"),
-    BROADCAST("BROADCAST"),
-    PARTITIONED("PARTITIONED");
-
-    private final String description_;
-
-    private DistributionMode(String description) {
-      description_ = description;
-    }
-
-    @Override
-    public String toString() { return description_; }
-  }
-
-  public JoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
-      DistributionMode distrMode, JoinOperator joinOp,
-      List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts,
-      String displayName) {
-    super(displayName);
-    Preconditions.checkNotNull(otherJoinConjuncts);
-    isStraightJoin_ = isStraightJoin;
-    distrModeHint_ = distrMode;
-    joinOp_ = joinOp;
-    children_.add(outer);
-    children_.add(inner);
-    eqJoinConjuncts_ = eqJoinConjuncts;
-    otherJoinConjuncts_ = otherJoinConjuncts;
-    computeTupleIds();
-  }
-
-  @Override
-  public void computeTupleIds() {
-    Preconditions.checkState(children_.size() == 2);
-    clearTupleIds();
-    PlanNode outer = children_.get(0);
-    PlanNode inner = children_.get(1);
-
-    // Only retain the non-semi-joined tuples of the inputs.
-    switch (joinOp_) {
-      case LEFT_ANTI_JOIN:
-      case LEFT_SEMI_JOIN:
-      case NULL_AWARE_LEFT_ANTI_JOIN: {
-        tupleIds_.addAll(outer.getTupleIds());
-        break;
-      }
-      case RIGHT_ANTI_JOIN:
-      case RIGHT_SEMI_JOIN: {
-        tupleIds_.addAll(inner.getTupleIds());
-        break;
-      }
-      default: {
-        tupleIds_.addAll(outer.getTupleIds());
-        tupleIds_.addAll(inner.getTupleIds());
-        break;
-      }
-    }
-    tblRefIds_.addAll(outer.getTblRefIds());
-    tblRefIds_.addAll(inner.getTblRefIds());
-
-    // Inherits all the nullable tuple from the children
-    // Mark tuples that form the "nullable" side of the outer join as nullable.
-    nullableTupleIds_.addAll(inner.getNullableTupleIds());
-    nullableTupleIds_.addAll(outer.getNullableTupleIds());
-    if (joinOp_.equals(JoinOperator.FULL_OUTER_JOIN)) {
-      nullableTupleIds_.addAll(outer.getTupleIds());
-      nullableTupleIds_.addAll(inner.getTupleIds());
-    } else if (joinOp_.equals(JoinOperator.LEFT_OUTER_JOIN)) {
-      nullableTupleIds_.addAll(inner.getTupleIds());
-    } else if (joinOp_.equals(JoinOperator.RIGHT_OUTER_JOIN)) {
-      nullableTupleIds_.addAll(outer.getTupleIds());
-    }
-  }
-
-  public JoinOperator getJoinOp() { return joinOp_; }
-  public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
-  public List<Expr> getOtherJoinConjuncts() { return otherJoinConjuncts_; }
-  public boolean isStraightJoin() { return isStraightJoin_; }
-  public DistributionMode getDistributionModeHint() { return distrModeHint_; }
-  public DistributionMode getDistributionMode() { return distrMode_; }
-  public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; }
-  public JoinTableId getJoinTableId() { return joinTableId_; }
-  public void setJoinTableId(JoinTableId id) { joinTableId_ = id; }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    // Do not call super.init() to defer computeStats() until all conjuncts
-    // have been collected.
-    assignConjuncts(analyzer);
-    createDefaultSmap(analyzer);
-    assignedConjuncts_ = analyzer.getAssignedConjuncts();
-    otherJoinConjuncts_ = Expr.substituteList(otherJoinConjuncts_,
-        getCombinedChildSmap(), analyzer, false);
-  }
-
-  /**
-   * Returns the estimated cardinality of an inner or outer join.
-   *
-   * We estimate the cardinality based on equality join predicates of the form
-   * "L.c = R.d", with L being a table from child(0) and R a table from child(1).
-   * For each such join predicate we try to determine whether it is a foreign/primary
-   * key (FK/PK) join condition, and either use a special FK/PK estimation or a generic
-   * estimation method. We maintain the minimum cardinality for each method separately,
-   * and finally return in order of preference:
-   * - the FK/PK estimate, if there was at least one FP/PK predicate
-   * - the generic estimate, if there was at least one predicate with sufficient stats
-   * - otherwise, we optimistically assume a FK/PK join with a join selectivity of 1,
-   *   and return |child(0)|
-   *
-   * FK/PK estimation:
-   * cardinality = |child(0)| * (|child(1)| / |R|) * (NDV(R.d) / NDV(L.c))
-   * - the cardinality of a FK/PK must be <= |child(0)|
-   * - |child(1)| / |R| captures the reduction in join cardinality due to
-   *   predicates on the PK side
-   * - NDV(R.d) / NDV(L.c) adjusts the join cardinality to avoid underestimation
-   *   due to an independence assumption if the PK side has a higher NDV than the FK
-   *   side. The rationale is that rows filtered from the PK side do not necessarily
-   *   have a match on the FK side, and therefore would not affect the join cardinality.
-   *   TODO: Revisit this pessimistic adjustment that tends to overestimate.
-   *
-   * Generic estimation:
-   * cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d))
-   * - case A: NDV(L.c) <= NDV(R.d)
-   *   every row from child(0) joins with |child(1)| / NDV(R.d) rows
-   * - case B: NDV(L.c) > NDV(R.d)
-   *   every row from child(1) joins with |child(0)| / NDV(L.c) rows
-   * - we adjust the NDVs from both sides to account for predicates that may
-   *   might have reduce the cardinality and NDVs
-   */
-  private long getJoinCardinality(Analyzer analyzer) {
-    Preconditions.checkState(
-        joinOp_ == JoinOperator.INNER_JOIN || joinOp_.isOuterJoin());
-
-    long lhsCard = getChild(0).cardinality_;
-    long rhsCard = getChild(1).cardinality_;
-    if (lhsCard == -1 || rhsCard == -1) return -1;
-
-    // Minimum of estimated join cardinalities for FK/PK join conditions.
-    long fkPkJoinCard = -1;
-    // Minimum of estimated join cardinalities for other join conditions.
-    long genericJoinCard = -1;
-    for (Expr eqJoinConjunct: eqJoinConjuncts_) {
-      SlotStats lhsStats = SlotStats.create(eqJoinConjunct.getChild(0));
-      SlotStats rhsStats = SlotStats.create(eqJoinConjunct.getChild(1));
-      // Ignore the equi-join conjunct if we have no relevant table or column stats.
-      if (lhsStats == null || rhsStats == null) continue;
-
-      // We assume a FK/PK join based on the following intuitions:
-      // 1. NDV(L.c) <= NDV(R.d)
-      //    The reasoning is that a FK/PK join is unlikely if the foreign key
-      //    side has a higher NDV than the primary key side. We may miss true
-      //    FK/PK joins due to inaccurate and/or stale stats.
-      // 2. R.d is probably a primary key.
-      //    Requires that NDV(R.d) is very close to |R|.
-      // The idea is that, by default, we assume that every join is a FK/PK join unless
-      // we have compelling evidence that suggests otherwise, so by using || we give the
-      // FK/PK assumption more chances to succeed.
-      if (lhsStats.ndv <= rhsStats.ndv * (1.0 + FK_PK_MAX_STATS_DELTA_PERC) ||
-          Math.abs(rhsStats.numRows - rhsStats.ndv) / (double) rhsStats.numRows
-            <= FK_PK_MAX_STATS_DELTA_PERC) {
-        // Adjust the join selectivity based on the NDV ratio to avoid underestimating
-        // the cardinality if the PK side has a higher NDV than the FK side.
-        double ndvRatio = (double) rhsStats.ndv / (double) lhsStats.ndv;
-        double rhsSelectivity = (double) rhsCard / (double) rhsStats.numRows;
-        long joinCard = (long) Math.ceil(lhsCard * rhsSelectivity * ndvRatio);
-        // FK/PK join cardinality must be <= the lhs cardinality.
-        joinCard = Math.min(lhsCard, joinCard);
-        if (fkPkJoinCard == -1) {
-          fkPkJoinCard = joinCard;
-        } else {
-          fkPkJoinCard = Math.min(fkPkJoinCard, joinCard);
-        }
-      } else {
-        // Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs
-        // should only decrease, so we bail if the adjustment would lead to an increase.
-        // TODO: Adjust the NDVs more systematically throughout the plan tree to
-        // get a more accurate NDV at this plan node.
-        if (lhsCard > lhsStats.numRows || rhsCard > rhsStats.numRows) continue;
-        double lhsAdjNdv = lhsStats.ndv * ((double)lhsCard / lhsStats.numRows);
-        double rhsAdjNdv = rhsStats.ndv * ((double)rhsCard / rhsStats.numRows);
-        // Generic join cardinality estimation.
-        long joinCard = (long) Math.ceil(
-            (lhsCard / Math.max(lhsAdjNdv, rhsAdjNdv)) * rhsCard);
-        if (genericJoinCard == -1) {
-          genericJoinCard = joinCard;
-        } else {
-          genericJoinCard = Math.min(genericJoinCard, joinCard);
-        }
-      }
-    }
-
-    if (fkPkJoinCard != -1) {
-      return fkPkJoinCard;
-    } else if (genericJoinCard != -1) {
-      return genericJoinCard;
-    } else {
-      // Optimistic FK/PK assumption with join selectivity of 1.
-      return lhsCard;
-    }
-  }
-
-  /**
-   * Class combining column and table stats for a particular slot. Contains the NDV
-   * for the slot and the number of rows in the originating table.
-   */
-  private static class SlotStats {
-    // Number of distinct values of the slot.
-    public final long ndv;
-    // Number of rows in the originating table.
-    public final long numRows;
-
-    public SlotStats(long ndv, long numRows) {
-      // Cap NDV at num rows of the table.
-      this.ndv = Math.min(ndv, numRows);
-      this.numRows = numRows;
-    }
-
-    /**
-     * Returns a new SlotStats object from the given expr that is guaranteed
-     * to have valid stats.
-     * Returns null if 'e' is not a SlotRef or a cast SlotRef, or if there are no
-     * valid table/column stats for 'e'.
-     */
-    public static SlotStats create(Expr e) {
-      // We need both the table and column stats, but 'e' might not directly reference
-      // a scan slot, e.g., if 'e' references a grouping slot of an agg. So we look for
-      // that source scan slot, traversing through materialization points if necessary.
-      SlotDescriptor slotDesc = e.findSrcScanSlot();
-      if (slotDesc == null) return null;
-      Table table = slotDesc.getParent().getTable();
-      if (table == null || table.getNumRows() == -1) return null;
-      if (!slotDesc.getStats().hasNumDistinctValues()) return null;
-      return new SlotStats(
-          slotDesc.getStats().getNumDistinctValues(), table.getNumRows());
-    }
-  }
-
-  /**
-   * Returns the estimated cardinality of a semi join node.
-   * For a left semi join between child(0) and child(1), we look for equality join
-   * conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as
-   * the cardinality estimate the minimum of
-   *   |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
-   * over all suitable join conditions. The reasoning is that:
-   * - each row in child(0) is returned at most once
-   * - the probability of a row in child(0) having a match in R is
-   *   Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
-   *
-   * For a left anti join we estimate the cardinality as the minimum of:
-   *   |L| * Max(NDV(L.c) - NDV(R.d), NDV(L.c)) / NDV(L.c)
-   * over all suitable join conditions. The reasoning is that:
-   * - each row in child(0) is returned at most once
-   * - if NDV(L.c) > NDV(R.d) then the probability of row in L having a match
-   *   in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c)
-   * - otherwise, we conservatively use |L| to avoid underestimation
-   *
-   * We analogously estimate the cardinality for right semi/anti joins, and treat the
-   * null-aware anti join like a regular anti join
-   *
-   * TODO: In order to take into account additional conjuncts in the child child subtrees
-   * adjust NDV(L.c) by |child(0)| / |L| and the NDV(R.d) by |child(1)| / |R|.
-   * The adjustment is currently too dangerous due to the other planner bugs compounding
-   * to bad plans causing perf regressions (IMPALA-976).
-   */
-  private long getSemiJoinCardinality() {
-    Preconditions.checkState(joinOp_.isSemiJoin());
-
-    // Return -1 if the cardinality of the returned side is unknown.
-    long cardinality;
-    if (joinOp_ == JoinOperator.RIGHT_SEMI_JOIN
-        || joinOp_ == JoinOperator.RIGHT_ANTI_JOIN) {
-      if (getChild(1).cardinality_ == -1) return -1;
-      cardinality = getChild(1).cardinality_;
-    } else {
-      if (getChild(0).cardinality_ == -1) return -1;
-      cardinality = getChild(0).cardinality_;
-    }
-    double minSelectivity = 1.0;
-    for (Expr eqJoinPredicate: eqJoinConjuncts_) {
-      long lhsNdv = getNdv(eqJoinPredicate.getChild(0));
-      lhsNdv = Math.min(lhsNdv, getChild(0).cardinality_);
-      long rhsNdv = getNdv(eqJoinPredicate.getChild(1));
-      rhsNdv = Math.min(rhsNdv, getChild(1).cardinality_);
-
-      // Skip conjuncts with unknown NDV on either side.
-      if (lhsNdv == -1 || rhsNdv == -1) continue;
-
-      double selectivity = 1.0;
-      switch (joinOp_) {
-        case LEFT_SEMI_JOIN: {
-          selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv);
-          break;
-        }
-        case RIGHT_SEMI_JOIN: {
-          selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv);
-          break;
-        }
-        case LEFT_ANTI_JOIN:
-        case NULL_AWARE_LEFT_ANTI_JOIN: {
-          selectivity = (double) Math.max(lhsNdv - rhsNdv, lhsNdv) / (double) lhsNdv;
-          break;
-        }
-        case RIGHT_ANTI_JOIN: {
-          selectivity = (double) Math.max(rhsNdv - lhsNdv, rhsNdv) / (double) rhsNdv;
-          break;
-        }
-        default: Preconditions.checkState(false);
-      }
-      minSelectivity = Math.min(minSelectivity, selectivity);
-    }
-
-    Preconditions.checkState(cardinality != -1);
-    return Math.round(cardinality * minSelectivity);
-  }
-
-  /**
-   * Unwraps the SlotRef in expr and returns the NDVs of it.
-   * Returns -1 if the NDVs are unknown or if expr is not a SlotRef.
-   */
-  private long getNdv(Expr expr) {
-    SlotRef slotRef = expr.unwrapSlotRef(false);
-    if (slotRef == null) return -1;
-    SlotDescriptor slotDesc = slotRef.getDesc();
-    if (slotDesc == null) return -1;
-    ColumnStats stats = slotDesc.getStats();
-    if (!stats.hasNumDistinctValues()) return -1;
-    return stats.getNumDistinctValues();
-  }
-
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    if (joinOp_.isSemiJoin()) {
-      cardinality_ = getSemiJoinCardinality();
-    } else if (joinOp_.isInnerJoin() || joinOp_.isOuterJoin()){
-      cardinality_ = getJoinCardinality(analyzer);
-    } else {
-      Preconditions.checkState(joinOp_.isCrossJoin());
-      long leftCard = getChild(0).cardinality_;
-      long rightCard = getChild(1).cardinality_;
-      if (leftCard != -1 && rightCard != -1) {
-        cardinality_ = multiplyCardinalities(leftCard, rightCard);
-      }
-    }
-
-    // Impose lower/upper bounds on the cardinality based on the join type.
-    long leftCard = getChild(0).cardinality_;
-    long rightCard = getChild(1).cardinality_;
-    switch (joinOp_) {
-      case LEFT_SEMI_JOIN: {
-        if (leftCard != -1) {
-          cardinality_ = Math.min(leftCard, cardinality_);
-        }
-        break;
-      }
-      case RIGHT_SEMI_JOIN: {
-        if (rightCard != -1) {
-          cardinality_ = Math.min(rightCard, cardinality_);
-        }
-        break;
-      }
-      case LEFT_OUTER_JOIN: {
-        if (leftCard != -1) {
-          cardinality_ = Math.max(leftCard, cardinality_);
-        }
-        break;
-      }
-      case RIGHT_OUTER_JOIN: {
-        if (rightCard != -1) {
-          cardinality_ = Math.max(rightCard, cardinality_);
-        }
-        break;
-      }
-      case FULL_OUTER_JOIN: {
-        if (leftCard != -1 && rightCard != -1) {
-          long cardinalitySum = addCardinalities(leftCard, rightCard);
-          cardinality_ = Math.max(cardinalitySum, cardinality_);
-        }
-        break;
-      }
-      case LEFT_ANTI_JOIN:
-      case NULL_AWARE_LEFT_ANTI_JOIN: {
-        if (leftCard != -1) {
-          cardinality_ = Math.min(leftCard, cardinality_);
-        }
-        break;
-      }
-      case RIGHT_ANTI_JOIN: {
-        if (rightCard != -1) {
-          cardinality_ = Math.min(rightCard, cardinality_);
-        }
-        break;
-      }
-      case CROSS_JOIN: {
-        if (getChild(0).cardinality_ == -1 || getChild(1).cardinality_ == -1) {
-          cardinality_ = -1;
-        } else {
-          cardinality_ = multiplyCardinalities(getChild(0).cardinality_,
-              getChild(1).cardinality_);
-        }
-        break;
-      }
-    }
-    cardinality_ = capAtLimit(cardinality_);
-    Preconditions.checkState(hasValidStats());
-    LOG.debug("stats Join: cardinality=" + Long.toString(cardinality_));
-  }
-
-  /**
-   * Inverts the join op, swaps our children, and swaps the children
-   * of all eqJoinConjuncts_. All modifications are in place.
-   */
-  public void invertJoin() {
-    joinOp_ = joinOp_.invert();
-    Collections.swap(children_, 0, 1);
-    for (BinaryPredicate p: eqJoinConjuncts_) p.reverse();
-  }
-
-  public boolean hasConjuncts() {
-    return !eqJoinConjuncts_.isEmpty() || !otherJoinConjuncts_.isEmpty() ||
-        !conjuncts_.isEmpty();
-  }
-
-  @Override
-  protected String getDisplayLabelDetail() {
-    StringBuilder output = new StringBuilder(joinOp_.toString());
-    if (distrMode_ != DistributionMode.NONE) output.append(", " + distrMode_.toString());
-    return output.toString();
-  }
-
-  protected void orderJoinConjunctsByCost() {
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-    eqJoinConjuncts_ = orderConjunctsByCost(eqJoinConjuncts_);
-    otherJoinConjuncts_ = orderConjunctsByCost(otherJoinConjuncts_);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
deleted file mode 100644
index 5cf7a2b..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
+++ /dev/null
@@ -1,47 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.common.Id;
-import com.cloudera.impala.common.IdGenerator;
-
-public class JoinTableId extends Id<JoinTableId> {
-  // Construction only allowed via an IdGenerator.
-  protected JoinTableId(int id) {
-    super(id);
-  }
-
-  public static JoinTableId INVALID;
-  static {
-    INVALID = new JoinTableId(Id.INVALID_ID);
-  }
-
-  public static IdGenerator<JoinTableId> createGenerator() {
-    return new IdGenerator<JoinTableId>() {
-      @Override
-      public JoinTableId getNextId() { return new JoinTableId(nextId_++); }
-      @Override
-      public JoinTableId getMaxId() { return new JoinTableId(nextId_ - 1); }
-    };
-  }
-
-  @Override
-  public String toString() {
-    return String.format("%02d", id_);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java
deleted file mode 100644
index 4f654a9..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/KuduScanNode.java
+++ /dev/null
@@ -1,358 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-
-import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.Schema;
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduClient.KuduClientBuilder;
-import org.apache.kudu.client.KuduPredicate;
-import org.apache.kudu.client.KuduPredicate.ComparisonOp;
-import org.apache.kudu.client.KuduScanToken;
-import org.apache.kudu.client.KuduScanToken.KuduScanTokenBuilder;
-import org.apache.kudu.client.LocatedTablet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.BoolLiteral;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.LiteralExpr;
-import com.cloudera.impala.analysis.NullLiteral;
-import com.cloudera.impala.analysis.NumericLiteral;
-import com.cloudera.impala.analysis.SlotDescriptor;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.analysis.StringLiteral;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.catalog.KuduTable;
-import com.cloudera.impala.common.ImpalaRuntimeException;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TKuduScanNode;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TScanRange;
-import com.cloudera.impala.thrift.TScanRangeLocation;
-import com.cloudera.impala.thrift.TScanRangeLocations;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Scan of a single Kudu table.
- *
- * Extracts predicates that can be pushed down to Kudu. Currently only binary predicates
- * that have a constant expression on one side and a slot ref on the other can be
- * evaluated by Kudu.
- *
- * Uses the Kudu ScanToken API to generate a set of Kudu "scan tokens" which are used for
- * scheduling and initializing the scanners. Scan tokens are opaque objects that represent
- * a scan for some Kudu data on a tablet (currently one token represents one tablet), and
- * it contains the tablet locations and all information needed to produce a Kudu scanner,
- * including the projected columns and predicates that are pushed down.
- *
- * After KUDU-1065 is resolved, Kudu will also prune the tablets that don't need to be
- * scanned, and only the tokens for those tablets will be returned.
- */
-public class KuduScanNode extends ScanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(KuduScanNode.class);
-
-  private final KuduTable kuduTable_;
-
-  // Indexes for the set of hosts that will be used for the query.
-  // From analyzer.getHostIndex().getIndex(address)
-  private final Set<Integer> hostIndexSet_ = Sets.newHashSet();
-
-  // List of conjuncts that can be pushed down to Kudu, after they have been normalized
-  // by BinaryPredicate.normalizeSlotRefComparison(). Used for computing stats and
-  // explain strings.
-  private final List<Expr> kuduConjuncts_ = Lists.newArrayList();
-
-  // Exprs in kuduConjuncts_ converted to KuduPredicates.
-  private final List<KuduPredicate> kuduPredicates_ = Lists.newArrayList();
-
-  public KuduScanNode(PlanNodeId id, TupleDescriptor desc) {
-    super(id, desc, "SCAN KUDU");
-    kuduTable_ = (KuduTable) desc_.getTable();
-  }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaRuntimeException {
-    assignConjuncts(analyzer);
-    analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-
-    try (KuduClient client =
-         new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) {
-      org.apache.kudu.client.KuduTable rpcTable =
-          client.openTable(kuduTable_.getKuduTableName());
-      validateSchema(rpcTable);
-
-      // Extract predicates that can be evaluated by Kudu.
-      extractKuduConjuncts(analyzer, client, rpcTable);
-
-      // Materialize the slots of the remaining conjuncts (i.e. those not pushed to Kudu)
-      analyzer.materializeSlots(conjuncts_);
-
-      // Creates Kudu scan tokens and sets the scan range locations.
-      computeScanRangeLocations(analyzer, client, rpcTable);
-    } catch (Exception e) {
-      throw new ImpalaRuntimeException("Unable to initialize the Kudu scan node", e);
-    }
-
-    computeMemLayout(analyzer);
-    computeStats(analyzer);
-  }
-
-  /**
-   * Validate the columns Impala expects are actually in the Kudu table.
-   */
-  private void validateSchema(org.apache.kudu.client.KuduTable rpcTable)
-      throws ImpalaRuntimeException {
-    Schema tableSchema = rpcTable.getSchema();
-    for (SlotDescriptor desc: getTupleDesc().getSlots()) {
-      String colName = desc.getColumn().getName();
-      try {
-        tableSchema.getColumn(colName);
-      } catch (Exception e) {
-        throw new ImpalaRuntimeException("Column '" + colName + "' not found in kudu " +
-            "table " + rpcTable.getName());
-      }
-    }
-  }
-
-  /**
-   * Compute the scan range locations for the given table using the scan tokens.
-   */
-  private void computeScanRangeLocations(Analyzer analyzer,
-      KuduClient client, org.apache.kudu.client.KuduTable rpcTable)
-      throws ImpalaRuntimeException {
-    scanRanges_ = Lists.newArrayList();
-
-    List<KuduScanToken> scanTokens = createScanTokens(client, rpcTable);
-    for (KuduScanToken token: scanTokens) {
-      LocatedTablet tablet = token.getTablet();
-      List<TScanRangeLocation> locations = Lists.newArrayList();
-      if (tablet.getReplicas().isEmpty()) {
-        throw new ImpalaRuntimeException(String.format(
-            "At least one tablet does not have any replicas. Tablet ID: %s",
-            new String(tablet.getTabletId(), Charsets.UTF_8)));
-      }
-
-      for (LocatedTablet.Replica replica: tablet.getReplicas()) {
-        TNetworkAddress address =
-            new TNetworkAddress(replica.getRpcHost(), replica.getRpcPort());
-        // Use the network address to look up the host in the global list
-        Integer hostIndex = analyzer.getHostIndex().getIndex(address);
-        locations.add(new TScanRangeLocation(hostIndex));
-        hostIndexSet_.add(hostIndex);
-      }
-
-      TScanRange scanRange = new TScanRange();
-      try {
-        scanRange.setKudu_scan_token(token.serialize());
-      } catch (IOException e) {
-        throw new ImpalaRuntimeException("Unable to serialize Kudu scan token=" +
-            token.toString(), e);
-      }
-
-      TScanRangeLocations locs = new TScanRangeLocations();
-      locs.setScan_range(scanRange);
-      locs.locations = locations;
-      scanRanges_.add(locs);
-    }
-  }
-
-  /**
-   * Returns KuduScanTokens for this scan given the projected columns and predicates that
-   * will be pushed to Kudu.
-   */
-  private List<KuduScanToken> createScanTokens(KuduClient client,
-      org.apache.kudu.client.KuduTable rpcTable) {
-    List<String> projectedCols = Lists.newArrayList();
-    for (SlotDescriptor desc: getTupleDesc().getSlots()) {
-      if (desc.isMaterialized()) projectedCols.add(desc.getColumn().getName());
-    }
-
-    KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(rpcTable);
-    tokenBuilder.setProjectedColumnNames(projectedCols);
-    for (KuduPredicate predicate: kuduPredicates_) tokenBuilder.addPredicate(predicate);
-    return tokenBuilder.build();
-  }
-
-  @Override
-  protected double computeSelectivity() {
-    List<Expr> allConjuncts = Lists.newArrayList(
-        Iterables.concat(conjuncts_, kuduConjuncts_));
-    return computeCombinedSelectivity(allConjuncts);
-  }
-
-  @Override
-  protected void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    // Update the number of nodes to reflect the hosts that have relevant data.
-    numNodes_ = hostIndexSet_.size();
-
-    // Update the cardinality
-    inputCardinality_ = cardinality_ = kuduTable_.getNumRows();
-    cardinality_ *= computeSelectivity();
-    cardinality_ = Math.min(Math.max(1, cardinality_), kuduTable_.getNumRows());
-    cardinality_ = capAtLimit(cardinality_);
-    LOG.debug("computeStats KuduScan: cardinality=" + Long.toString(cardinality_));
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder result = new StringBuilder();
-
-    String aliasStr = desc_.hasExplicitAlias() ? " " + desc_.getAlias() : "";
-    result.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_,
-        kuduTable_.getFullName(), aliasStr));
-
-    switch (detailLevel) {
-      case MINIMAL: break;
-      case STANDARD: // Fallthrough intended.
-      case EXTENDED: // Fallthrough intended.
-      case VERBOSE: {
-        if (!conjuncts_.isEmpty()) {
-          result.append(detailPrefix + "predicates: " + getExplainString(conjuncts_)
-              + "\n");
-        }
-        if (!kuduConjuncts_.isEmpty()) {
-          result.append(detailPrefix + "kudu predicates: " + getExplainString(
-              kuduConjuncts_) + "\n");
-        }
-      }
-    }
-    return result.toString();
-  }
-
-  @Override
-  protected void toThrift(TPlanNode node) {
-    node.node_type = TPlanNodeType.KUDU_SCAN_NODE;
-    node.kudu_scan_node = new TKuduScanNode(desc_.getId().asInt());
-  }
-
-  /**
-   * Extracts predicates from conjuncts_ that can be pushed down to Kudu. Currently only
-   * binary predicates that have a constant expression on one side and a slot ref on the
-   * other can be evaluated by Kudu. Only looks at comparisons of constants (i.e., the
-   * bounds of the result can be evaluated with Expr::GetValue(NULL)). If a conjunct can
-   * be converted into this form, the normalized expr is added to kuduConjuncts_, a
-   * KuduPredicate is added to kuduPredicates_, and the original expr from conjuncts_ is
-   * removed.
-   */
-  private void extractKuduConjuncts(Analyzer analyzer,
-      KuduClient client, org.apache.kudu.client.KuduTable rpcTable) {
-    ListIterator<Expr> it = conjuncts_.listIterator();
-    while (it.hasNext()) {
-      if (tryConvertKuduPredicate(analyzer, rpcTable, it.next())) it.remove();
-    }
-  }
-
-  /**
-   * If 'expr' can be converted to a KuduPredicate, returns true and updates
-   * kuduPredicates_ and kuduConjuncts_.
-   */
-  private boolean tryConvertKuduPredicate(Analyzer analyzer,
-      org.apache.kudu.client.KuduTable table, Expr expr) {
-    if (!(expr instanceof BinaryPredicate)) return false;
-    BinaryPredicate predicate = (BinaryPredicate) expr;
-
-    // TODO KUDU-931 look into handling implicit/explicit casts on the SlotRef.
-    predicate = BinaryPredicate.normalizeSlotRefComparison(predicate, analyzer);
-    if (predicate == null) return false;
-    ComparisonOp op = getKuduOperator(((BinaryPredicate)predicate).getOp());
-    if (op == null) return false;
-
-    SlotRef ref = (SlotRef) predicate.getChild(0);
-    LiteralExpr literal = (LiteralExpr) predicate.getChild(1);
-
-    // Cannot push prediates with null literal values (KUDU-1595).
-    if (literal instanceof NullLiteral) return false;
-
-    String colName = ref.getDesc().getColumn().getName();
-    ColumnSchema column = table.getSchema().getColumn(colName);
-    KuduPredicate kuduPredicate = null;
-    switch (literal.getType().getPrimitiveType()) {
-      case BOOLEAN: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((BoolLiteral)literal).getValue());
-        break;
-      }
-      case TINYINT:
-      case SMALLINT:
-      case INT: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((NumericLiteral)literal).getIntValue());
-        break;
-      }
-      case BIGINT: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((NumericLiteral)literal).getLongValue());
-        break;
-      }
-      case FLOAT: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            (float)((NumericLiteral)literal).getDoubleValue());
-        break;
-      }
-      case DOUBLE: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((NumericLiteral)literal).getDoubleValue());
-        break;
-      }
-      case STRING:
-      case VARCHAR:
-      case CHAR: {
-        kuduPredicate = KuduPredicate.newComparisonPredicate(column, op,
-            ((StringLiteral)literal).getStringValue());
-        break;
-      }
-      default: break;
-    }
-    if (kuduPredicate == null) return false;
-
-    kuduConjuncts_.add(predicate);
-    kuduPredicates_.add(kuduPredicate);
-    return true;
-  }
-
-  /**
-   * Returns a Kudu comparison operator for the BinaryPredicate operator, or null if
-   * the operation is not supported by Kudu.
-   */
-  private static KuduPredicate.ComparisonOp getKuduOperator(BinaryPredicate.Operator op) {
-    switch (op) {
-      case GT: return ComparisonOp.GREATER;
-      case LT: return ComparisonOp.LESS;
-      case GE: return ComparisonOp.GREATER_EQUAL;
-      case LE: return ComparisonOp.LESS_EQUAL;
-      case EQ: return ComparisonOp.EQUAL;
-      default: return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java b/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java
deleted file mode 100644
index 8e8ac63..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/KuduTableSink.java
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.common.PrintUtils;
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TDataSinkType;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TKuduTableSink;
-import com.cloudera.impala.thrift.TTableSink;
-import com.cloudera.impala.thrift.TTableSinkType;
-import com.google.common.collect.Lists;
-
-/**
- * Class used to represent a Sink that will transport
- * data from a plan fragment into an Kudu table using a Kudu client.
- */
-public class KuduTableSink extends TableSink {
-
-  // Optional list of referenced Kudu table column indices. The position of a result
-  // expression i matches a column index into the Kudu schema at targetColdIdxs[i].
-  private ArrayList<Integer> targetColIdxs_;
-
-  private final boolean ignoreNotFoundOrDuplicate_;
-
-  public KuduTableSink(Table targetTable, Op sinkOp,
-      List<Integer> referencedColumns, boolean ignoreNotFoundOrDuplicate) {
-    super(targetTable, sinkOp);
-    targetColIdxs_ = referencedColumns != null
-        ? Lists.newArrayList(referencedColumns) : null;
-    ignoreNotFoundOrDuplicate_ = ignoreNotFoundOrDuplicate;
-  }
-
-  @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(prefix + sinkOp_.toExplainString());
-    output.append(" KUDU [" + targetTable_.getFullName() + "]\n");
-    output.append(detailPrefix);
-    if (sinkOp_ == Op.INSERT) {
-      output.append("check unique keys: ");
-    } else {
-      output.append("check keys exist: ");
-    }
-    output.append(ignoreNotFoundOrDuplicate_);
-    output.append("\n");
-    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(PrintUtils.printHosts(detailPrefix, fragment_.getNumNodes()));
-      output.append(PrintUtils.printMemCost(" ", perHostMemCost_));
-      output.append("\n");
-    }
-    return output.toString();
-  }
-
-  @Override
-  protected TDataSink toThrift() {
-    TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK);
-    TTableSink tTableSink = new TTableSink(targetTable_.getId().asInt(),
-        TTableSinkType.KUDU, sinkOp_.toThrift());
-    TKuduTableSink tKuduSink = new TKuduTableSink();
-    tKuduSink.setReferenced_columns(targetColIdxs_);
-    tKuduSink.setIgnore_not_found_or_duplicate(ignoreNotFoundOrDuplicate_);
-    tTableSink.setKudu_table_sink(tKuduSink);
-    result.table_sink = tTableSink;
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
deleted file mode 100644
index e989438..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TNestedLoopJoinNode;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-
-/**
- * Nested-loop join between left child and right child.
- * Initially, the join operator fully materializes the right input in memory.
- * Subsequently, for every row from the left input it identifies the matching rows
- * from the right hand side and produces the join result according to the join operator.
- * The nested-loop join is used when there are no equi-join predicates. Hence,
- * eqJoinConjuncts_ should be empty and all the join conjuncts are stored in
- * otherJoinConjuncts_. Currrently, all join operators are supported except for
- * null-aware anti join.
- *
- * Note: The operator does not spill to disk when there is not enough memory to hold the
- * right input.
- */
-public class NestedLoopJoinNode extends JoinNode {
-  private final static Logger LOG = LoggerFactory.getLogger(NestedLoopJoinNode.class);
-
-  public NestedLoopJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
-      DistributionMode distrMode, JoinOperator joinOp, List<Expr> otherJoinConjuncts) {
-    super(outer, inner, isStraightJoin, distrMode, joinOp,
-        Collections.<BinaryPredicate>emptyList(), otherJoinConjuncts,
-        "NESTED LOOP JOIN");
-  }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    super.init(analyzer);
-    Preconditions.checkState(eqJoinConjuncts_.isEmpty());
-    // Set the proper join operator based on whether predicates are assigned or not.
-    if (conjuncts_.isEmpty() && otherJoinConjuncts_.isEmpty() && !joinOp_.isSemiJoin() &&
-        !joinOp_.isOuterJoin()) {
-      joinOp_ = JoinOperator.CROSS_JOIN;
-    } else if (joinOp_.isCrossJoin()) {
-      // A cross join with predicates is an inner join.
-      joinOp_ = JoinOperator.INNER_JOIN;
-    }
-    orderJoinConjunctsByCost();
-    computeStats(analyzer);
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
-        || numNodes_ == 0) {
-      perHostMemCost_ = DEFAULT_PER_HOST_MEM;
-      return;
-    }
-    perHostMemCost_ =
-        (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    String labelDetail = getDisplayLabelDetail();
-    if (labelDetail == null) {
-      output.append(prefix + getDisplayLabel() + "\n");
-    } else {
-      output.append(String.format("%s%s:%s [%s]\n", prefix, id_.toString(),
-          displayName_, getDisplayLabelDetail()));
-    }
-    if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
-      if (joinTableId_.isValid()) {
-          output.append(
-              detailPrefix + "join table id: " + joinTableId_.toString() + "\n");
-      }
-      if (!otherJoinConjuncts_.isEmpty()) {
-        output.append(detailPrefix + "join predicates: ")
-        .append(getExplainString(otherJoinConjuncts_) + "\n");
-      }
-      if (!conjuncts_.isEmpty()) {
-        output.append(detailPrefix + "predicates: ")
-        .append(getExplainString(conjuncts_) + "\n");
-      }
-    }
-    return output.toString();
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.NESTED_LOOP_JOIN_NODE;
-    msg.nested_loop_join_node = new TNestedLoopJoinNode();
-    msg.nested_loop_join_node.join_op = joinOp_.toThrift();
-    for (Expr e: otherJoinConjuncts_) {
-      msg.nested_loop_join_node.addToJoin_conjuncts(e.treeToThrift());
-    }
-  }
-
-  @Override
-  protected String debugString() {
-    return Objects.toStringHelper(this)
-        .addValue(super.debugString())
-        .toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
deleted file mode 100644
index 905d68d..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
+++ /dev/null
@@ -1,205 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.common.IdGenerator;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * The parallel planner is responsible for breaking up a single distributed plan
- * (= tree of PlanFragments) into a (logical) tree of distributed plans. The root
- * of that tree produces the query result, all the other ones produce intermediate
- * join build sides. All plans that produce intermediate join build sides (one per join
- * node in the recipient) for a single recipient plan are grouped together into a
- * cohort. Since each plan may only produce a build side for at most one recipient
- * plan, each plan belongs to exactly one cohort.
- *
- * TODO: if the input to the JoinBuildSink is the result of a grouping aggregation
- * on the join keys, the AggregationNode should materialize the final hash table
- * directly (instead of reading the hash table content and feeding it into a
- * JoinBuildSink to build another hash table)
- *
- * TODO: instead of cohort ids, create a Plan class that is a subclass of TreeNode?
- */
-public class ParallelPlanner {
-  private final static Logger LOG = LoggerFactory.getLogger(ParallelPlanner.class);
-
-  private final IdGenerator<JoinTableId> joinTableIdGenerator_ =
-      JoinTableId.createGenerator();
-  private final IdGenerator<PlanId> planIdGenerator_ = PlanId.createGenerator();
-  private final IdGenerator<CohortId> cohortIdGenerator_ = CohortId.createGenerator();
-  private final PlannerContext ctx_;
-
-  private List<PlanFragment> planRoots_ = Lists.newArrayList();
-
-  public ParallelPlanner(PlannerContext ctx) { ctx_ = ctx; }
-
-  /**
-   * Given a distributed plan, return list of plans ready for parallel execution.
-   * The last plan in the sequence materializes the query result, the preceding
-   * plans materialize the build sides of joins.
-   * Assigns cohortId and planId for all fragments.
-   * TODO: create class DistributedPlan with a PlanFragment member, so we don't
-   * need to distinguish PlanFragment and Plan through comments?
-   */
-  public List<PlanFragment> createPlans(PlanFragment root) {
-    root.setPlanId(planIdGenerator_.getNextId());
-    root.setCohortId(cohortIdGenerator_.getNextId());
-    planRoots_.add(root);
-    createBuildPlans(root, null);
-    return planRoots_;
-  }
-
-  /**
-   * Recursively traverse tree of fragments of 'plan' from top to bottom and
-   * move all build inputs of joins into separate plans. 'buildCohortId' is the
-   * cohort id of the build plans of 'fragment' and may be null if the plan
-   * to which 'fragment' belongs has so far not required any build plans.
-   * Assign fragment's plan id and cohort id to children.
-   */
-  private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) {
-    LOG.info("createbuildplans fragment " + fragment.getId().toString());
-    List<JoinNode> joins = Lists.newArrayList();
-    collectJoins(fragment.getPlanRoot(), joins);
-    if (!joins.isEmpty()) {
-      List<String> joinIds = Lists.newArrayList();
-      for (JoinNode join: joins) joinIds.add(join.getId().toString());
-      LOG.info("collected joins " + Joiner.on(" ").join(joinIds));
-
-      if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
-      for (JoinNode join: joins) createBuildPlan(join, buildCohortId);
-    }
-
-    if (!fragment.getChildren().isEmpty()) {
-      List<String> ids = Lists.newArrayList();
-      for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString());
-      LOG.info("collected children " + Joiner.on(" ").join(ids) + " parent "
-          + fragment.getId().toString());
-    }
-    for (PlanFragment child: fragment.getChildren()) {
-      child.setPlanId(fragment.getPlanId());
-      child.setCohortId(fragment.getCohortId());
-      createBuildPlans(child, buildCohortId);
-    }
-  }
-
-  /**
-   * Collect all JoinNodes that aren't themselves the build side of a join node
-   * in this fragment or the rhs of a SubplanNode.
-   */
-  private void collectJoins(PlanNode node, List<JoinNode> result) {
-    if (node instanceof JoinNode) {
-      result.add((JoinNode)node);
-      // for joins, only descend through the probe side;
-      // we're recursively traversing the build side when constructing the build plan
-      // in createBuildPlan()
-      collectJoins(node.getChild(0), result);
-      return;
-    }
-    if (node instanceof ExchangeNode) return;
-    if (node instanceof SubplanNode) {
-      collectJoins(node.getChild(0), result);
-      return;
-    }
-    for (PlanNode child: node.getChildren()) collectJoins(child, result);
-  }
-
-  /**
-   * Collect all ExchangeNodes in this fragment.
-   */
-  private void collectExchangeNodes(PlanNode node, List<ExchangeNode> result) {
-    if (node instanceof ExchangeNode) {
-      result.add((ExchangeNode)node);
-      return;
-    }
-    for (PlanNode child: node.getChildren()) collectExchangeNodes(child, result);
-  }
-
-  /**
-   * Create new plan that materializes build input of 'join' and assign it 'cohortId'.
-   * In the process, moves all fragments required for this materialization from tree
-   * rooted at 'join's fragment into the new plan.
-   * Also assigns the new plan a plan id.
-   */
-  private void createBuildPlan(JoinNode join, CohortId cohortId) {
-    LOG.info("createbuildplan " + join.getId().toString());
-    Preconditions.checkNotNull(cohortId);
-    // collect all ExchangeNodes on the build side and their corresponding input
-    // fragments
-    final List<ExchangeNode> exchNodes = Lists.newArrayList();
-    collectExchangeNodes(join.getChild(1), exchNodes);
-
-    com.google.common.base.Predicate<PlanFragment> isInputFragment =
-        new com.google.common.base.Predicate<PlanFragment>() {
-          public boolean apply(PlanFragment f) {
-            // we're starting with the fragment containing the join, which might
-            // be terminal
-            if (f.getDestNode() == null) return false;
-            for (ExchangeNode exch: exchNodes) {
-              if (exch.getId() == f.getDestNode().getId()) return true;
-            }
-            return false;
-          }
-        };
-    List<PlanFragment> inputFragments = Lists.newArrayList();
-    join.getFragment().collect(isInputFragment, inputFragments);
-    Preconditions.checkState(exchNodes.size() == inputFragments.size());
-
-    // Create new fragment with JoinBuildSink that consumes the output of the
-    // join's rhs input (the one that materializes the build side).
-    // The new fragment has the same data partition as the join node's fragment.
-    JoinBuildSink buildSink =
-        new JoinBuildSink(joinTableIdGenerator_.getNextId(), join);
-    join.setJoinTableId(buildSink.getJoinTableId());
-    // c'tor fixes up PlanNode.fragment_
-    PlanFragment buildFragment = new PlanFragment(ctx_.getNextFragmentId(),
-        join.getChild(1), join.getFragment().getDataPartition());
-    buildFragment.setSink(buildSink);
-
-    // move input fragments
-    for (int i = 0; i < exchNodes.size(); ++i) {
-      LOG.info("re-link fragment " + inputFragments.get(i).getId().toString() + " to "
-          + exchNodes.get(i).getFragment().getId().toString());
-      Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment);
-      join.getFragment().removeChild(inputFragments.get(i));
-      buildFragment.getChildren().add(inputFragments.get(i));
-    }
-
-    // assign plan and cohort id
-    buildFragment.setPlanId(planIdGenerator_.getNextId());
-    PlanId parentPlanId = join.getFragment().getPlanId();
-    buildFragment.setCohortId(cohortId);
-
-    planRoots_.add(buildFragment);
-    LOG.info("new build fragment " + buildFragment.getId().toString());
-    LOG.info("in cohort " + buildFragment.getCohortId().toString());
-    LOG.info("for join node " + join.getId().toString());
-    createBuildPlans(buildFragment, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java b/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java
deleted file mode 100644
index 6714213..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/PipelinedPlanNodeSet.java
+++ /dev/null
@@ -1,215 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Represents a set of PlanNodes and DataSinks that execute and consume resources
- * concurrently. PlanNodes and DataSinks in such a pipelined plan node set may belong
- * to different plan fragments because data is streamed across fragments.
- *
- * For example, a series of left-deep joins consists of two plan node sets. The first
- * set contains all build-side nodes. The second set contains the leftmost
- * scan. Both sets contain all join nodes because they execute and consume
- * resources during the build and probe phases. Similarly, all nodes below a 'blocking'
- * node (e.g, an AggregationNode) are placed into a differnet plan node set than the
- * nodes above it, but the blocking node itself belongs to both sets.
- */
-public class PipelinedPlanNodeSet {
-  private final static Logger LOG = LoggerFactory.getLogger(PipelinedPlanNodeSet.class);
-
-  // Minimum per-host resource requirements to ensure that no plan node set can have
-  // estimates of zero, even if the contained PlanNodes have estimates of zero.
-  public static final long MIN_PER_HOST_MEM = 10 * 1024 * 1024;
-  public static final int MIN_PER_HOST_VCORES = 1;
-
-  // List of plan nodes that execute and consume resources concurrently.
-  private final ArrayList<PlanNode> planNodes = Lists.newArrayList();
-
-  // DataSinks that execute and consume resources concurrently.
-  // Primarily used for estimating the cost of insert queries.
-  private final List<DataSink> dataSinks = Lists.newArrayList();
-
-  // Estimated per-host memory and CPU requirements.
-  // Valid after computeResourceEstimates().
-  private long perHostMem = MIN_PER_HOST_MEM;
-  private int perHostVcores = MIN_PER_HOST_VCORES;
-
-  public void add(PlanNode node) {
-    Preconditions.checkNotNull(node.getFragment());
-    planNodes.add(node);
-  }
-
-  public void addSink(DataSink sink) {
-    Preconditions.checkNotNull(sink);
-    dataSinks.add(sink);
-  }
-
-  /**
-   * Computes the estimated per-host memory and CPU requirements of this plan node set.
-   * Optionally excludes unpartitioned fragments from the estimation.
-   * Returns true if at least one plan node was included in the estimation.
-   * Otherwise returns false indicating the estimates are invalid.
-   */
-  public boolean computeResourceEstimates(boolean excludeUnpartitionedFragments,
-      TQueryOptions queryOptions) {
-    Set<PlanFragment> uniqueFragments = Sets.newHashSet();
-
-    // Distinguish the per-host memory estimates for scan nodes and non-scan nodes to
-    // get a tighter estimate on the amount of memory required by multiple concurrent
-    // scans. The memory required by all concurrent scans of the same type (Hdfs/Hbase)
-    // cannot exceed the per-host upper memory bound for that scan type. Intuitively,
-    // the amount of I/O buffers is limited by the disk bandwidth.
-    long perHostHbaseScanMem = 0L;
-    long perHostHdfsScanMem = 0L;
-    long perHostNonScanMem = 0L;
-
-    for (int i = 0; i < planNodes.size(); ++i) {
-      PlanNode node = planNodes.get(i);
-      PlanFragment fragment = node.getFragment();
-      if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
-      node.computeCosts(queryOptions);
-      uniqueFragments.add(fragment);
-      if (node.getPerHostMemCost() < 0) {
-        LOG.warn(String.format("Invalid per-host memory requirement %s of node %s.\n" +
-            "PlanNode stats are: numNodes_=%s ", node.getPerHostMemCost(),
-            node.getClass().getSimpleName(), node.getNumNodes()));
-      }
-      if (node instanceof HBaseScanNode) {
-        perHostHbaseScanMem += node.getPerHostMemCost();
-      } else if (node instanceof HdfsScanNode) {
-        perHostHdfsScanMem += node.getPerHostMemCost();
-      } else {
-        perHostNonScanMem += node.getPerHostMemCost();
-      }
-    }
-
-    // The memory required by concurrent scans cannot exceed the upper memory bound
-    // for that scan type.
-    // TODO: In the future, we may want to restrict scanner concurrency based on a
-    // memory limit. This estimation will need to accoung for that as well.
-    perHostHbaseScanMem =
-        Math.min(perHostHbaseScanMem, HBaseScanNode.getPerHostMemUpperBound());
-    perHostHdfsScanMem =
-        Math.min(perHostHdfsScanMem, HdfsScanNode.getPerHostMemUpperBound());
-
-    long perHostDataSinkMem = 0L;
-    for (int i = 0; i < dataSinks.size(); ++i) {
-      DataSink sink = dataSinks.get(i);
-      PlanFragment fragment = sink.getFragment();
-      if (!fragment.isPartitioned() && excludeUnpartitionedFragments) continue;
-      // Sanity check that this plan-node set has at least one PlanNode of fragment.
-      Preconditions.checkState(uniqueFragments.contains(fragment));
-      sink.computeCosts();
-      if (sink.getPerHostMemCost() < 0) {
-        LOG.warn(String.format("Invalid per-host memory requirement %s of sink %s.\n",
-            sink.getPerHostMemCost(), sink.getClass().getSimpleName()));
-      }
-      perHostDataSinkMem += sink.getPerHostMemCost();
-    }
-
-    // Combine the memory estimates of all sinks, scans nodes and non-scan nodes.
-    long perHostMem = perHostHdfsScanMem + perHostHbaseScanMem + perHostNonScanMem +
-        perHostDataSinkMem;
-
-    // The backend needs at least one thread per fragment.
-    int perHostVcores = uniqueFragments.size();
-
-    // This plan node set might only have unpartitioned fragments.
-    // Only set estimates if they are valid.
-    if (perHostMem >= 0 && perHostVcores >= 0) {
-      this.perHostMem = perHostMem;
-      this.perHostVcores = perHostVcores;
-      return true;
-    }
-    return false;
-  }
-
-  public long getPerHostMem() { return perHostMem; }
-  public int getPerHostVcores() { return perHostVcores; }
-
-  /**
-   * Computes and returns the pipelined plan node sets of the given plan.
-   */
-  public static ArrayList<PipelinedPlanNodeSet> computePlanNodeSets(PlanNode root) {
-    ArrayList<PipelinedPlanNodeSet> planNodeSets =
-        Lists.newArrayList(new PipelinedPlanNodeSet());
-    computePlanNodeSets(root, planNodeSets.get(0), null, planNodeSets);
-    return planNodeSets;
-  }
-
-  /**
-   * Populates 'planNodeSets' by recursively traversing the plan tree rooted at 'node'
-   * The plan node sets are computed top-down. As a result, the plan node sets are added
-   * in reverse order of their runtime execution.
-   *
-   * Nodes are generally added to lhsSet. Joins are treated specially in that their
-   * left child is added to lhsSet and their right child to rhsSet to make sure
-   * that concurrent join builds end up in the same plan node set.
-   */
-  private static void computePlanNodeSets(PlanNode node, PipelinedPlanNodeSet lhsSet,
-      PipelinedPlanNodeSet rhsSet, ArrayList<PipelinedPlanNodeSet> planNodeSets) {
-    lhsSet.add(node);
-    if (node == node.getFragment().getPlanRoot() && node.getFragment().hasSink()) {
-      lhsSet.addSink(node.getFragment().getSink());
-    }
-
-    if (node instanceof HashJoinNode) {
-      // Create a new set for the right-hand sides of joins if necessary.
-      if (rhsSet == null) {
-        rhsSet = new PipelinedPlanNodeSet();
-        planNodeSets.add(rhsSet);
-      }
-      // The join node itself is added to the lhsSet (above) and the rhsSet.
-      rhsSet.add(node);
-      computePlanNodeSets(node.getChild(1), rhsSet, null, planNodeSets);
-      computePlanNodeSets(node.getChild(0), lhsSet, rhsSet, planNodeSets);
-      return;
-    }
-
-    if (node.isBlockingNode()) {
-      // We add blocking nodes to two plan node sets because they require resources while
-      // consuming their input (execution of the preceding set) and while they
-      // emit their output (execution of the following set).
-      lhsSet = new PipelinedPlanNodeSet();
-      lhsSet.add(node);
-      planNodeSets.add(lhsSet);
-      // Join builds under this blocking node belong in a new rhsSet.
-      rhsSet = null;
-    }
-
-    // Assume that non-join, non-blocking nodes with multiple children
-    // (e.g., ExchangeNodes) consume their inputs in an arbitrary order,
-    // i.e., all child subtrees execute concurrently.
-    // TODO: This is not true for UnionNodes anymore. Fix the estimates accordingly.
-    for (PlanNode child: node.getChildren()) {
-      computePlanNodeSets(child, lhsSet, rhsSet, planNodeSets);
-    }
-  }
-}