You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/09/30 02:14:39 UTC

[22/61] [partial] incubator-impala git commit: IMPALA-3786: Replace "cloudera" with "apache" (part 1)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java b/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java
deleted file mode 100644
index 3320c2b..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/DataPartition.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.ExprSubstitutionMap;
-import com.cloudera.impala.thrift.TDataPartition;
-import com.cloudera.impala.thrift.TPartitionType;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Specification of the partition of a single stream of data.
- * Examples of those streams of data are: the scan of a table; the output
- * of a plan fragment; etc. (ie, this is not restricted to direct exchanges
- * between two fragments, which in the backend is facilitated by the classes
- * DataStreamSender/DataStreamMgr/DataStreamRecvr).
- */
-public class DataPartition {
-  private final static Logger LOG = LoggerFactory.getLogger(DataPartition.class);
-
-  private final TPartitionType type_;
-
-  // for hash partition: exprs used to compute hash value
-  private List<Expr> partitionExprs_;
-
-  private DataPartition(TPartitionType type, List<Expr> exprs) {
-    Preconditions.checkNotNull(exprs);
-    Preconditions.checkState(!exprs.isEmpty());
-    Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED
-        || type == TPartitionType.RANGE_PARTITIONED);
-    type_ = type;
-    partitionExprs_ = exprs;
-  }
-
-  private DataPartition(TPartitionType type) {
-    Preconditions.checkState(type == TPartitionType.UNPARTITIONED
-        || type == TPartitionType.RANDOM);
-    type_ = type;
-    partitionExprs_ = Lists.newArrayList();
-  }
-
-  public final static DataPartition UNPARTITIONED =
-      new DataPartition(TPartitionType.UNPARTITIONED);
-
-  public final static DataPartition RANDOM =
-      new DataPartition(TPartitionType.RANDOM);
-
-  public static DataPartition hashPartitioned(List<Expr> exprs) {
-    return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs);
-  }
-
-  public boolean isPartitioned() { return type_ != TPartitionType.UNPARTITIONED; }
-  public boolean isHashPartitioned() { return type_ == TPartitionType.HASH_PARTITIONED; }
-  public TPartitionType getType() { return type_; }
-  public List<Expr> getPartitionExprs() { return partitionExprs_; }
-
-  public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) {
-    partitionExprs_ = Expr.substituteList(partitionExprs_, smap, analyzer, false);
-  }
-
-  public TDataPartition toThrift() {
-    TDataPartition result = new TDataPartition(type_);
-    if (partitionExprs_ != null) {
-      result.setPartition_exprs(Expr.treesToThrift(partitionExprs_));
-    }
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) return false;
-    if (obj.getClass() != this.getClass()) return false;
-    DataPartition other = (DataPartition) obj;
-    if (type_ != other.type_) return false;
-    return Expr.equalLists(partitionExprs_, other.partitionExprs_);
-  }
-
-  public String debugString() {
-    return Objects.toStringHelper(this)
-        .add("type_", type_)
-        .addValue(Expr.debugString(partitionExprs_))
-        .toString();
-  }
-
-  public String getExplainString() {
-    StringBuilder str = new StringBuilder();
-    str.append(getPartitionShortName(type_));
-    if (!partitionExprs_.isEmpty()) {
-      List<String> strings = Lists.newArrayList();
-      for (Expr expr: partitionExprs_) {
-        strings.add(expr.toSql());
-      }
-      str.append("(" + Joiner.on(",").join(strings) +")");
-    }
-    return str.toString();
-  }
-
-  private String getPartitionShortName(TPartitionType partition) {
-    switch (partition) {
-      case RANDOM: return "RANDOM";
-      case HASH_PARTITIONED: return "HASH";
-      case RANGE_PARTITIONED: return "RANGE";
-      case UNPARTITIONED: return "UNPARTITIONED";
-      default: return "";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
deleted file mode 100644
index ff81b50..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.catalog.HBaseTable;
-import com.cloudera.impala.catalog.HdfsTable;
-import com.cloudera.impala.catalog.KuduTable;
-import com.cloudera.impala.catalog.Table;
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TExplainLevel;
-
-/**
- * A DataSink describes the destination of a plan fragment's output rows.
- * The destination could be another plan fragment on a remote machine,
- * or a table into which the rows are to be inserted
- * (i.e., the destination of the last fragment of an INSERT statement).
- */
-public abstract class DataSink {
-
-  // estimated per-host memory requirement for sink;
-  // set in computeCosts(); invalid: -1
-  protected long perHostMemCost_ = -1;
-
-  // Fragment that this DataSink belongs to. Set by the PlanFragment enclosing this sink.
-  protected PlanFragment fragment_;
-
-  /**
-   * Return an explain string for the DataSink. Each line of the explain will be prefixed
-   * by "prefix".
-   */
-  public abstract String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel explainLevel);
-
-  protected abstract TDataSink toThrift();
-
-  public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
-  public PlanFragment getFragment() { return fragment_; }
-  public long getPerHostMemCost() { return perHostMemCost_; }
-
-  /**
-   * Estimates the cost of executing this DataSink. Currently only sets perHostMemCost.
-   */
-  public void computeCosts() {
-    perHostMemCost_ = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java
deleted file mode 100644
index ab92605..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java
+++ /dev/null
@@ -1,371 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.BinaryPredicate;
-import com.cloudera.impala.analysis.BoolLiteral;
-import com.cloudera.impala.analysis.CompoundPredicate;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.LiteralExpr;
-import com.cloudera.impala.analysis.NumericLiteral;
-import com.cloudera.impala.analysis.SlotRef;
-import com.cloudera.impala.analysis.StringLiteral;
-import com.cloudera.impala.analysis.TupleDescriptor;
-import com.cloudera.impala.catalog.DataSource;
-import com.cloudera.impala.catalog.DataSourceTable;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.extdatasource.ExternalDataSourceExecutor;
-import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate;
-import com.cloudera.impala.extdatasource.thrift.TColumnDesc;
-import com.cloudera.impala.extdatasource.thrift.TComparisonOp;
-import com.cloudera.impala.extdatasource.thrift.TPrepareParams;
-import com.cloudera.impala.extdatasource.thrift.TPrepareResult;
-import com.cloudera.impala.service.FeSupport;
-import com.cloudera.impala.thrift.TCacheJarResult;
-import com.cloudera.impala.thrift.TColumnValue;
-import com.cloudera.impala.thrift.TDataSourceScanNode;
-import com.cloudera.impala.thrift.TErrorCode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TNetworkAddress;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TQueryOptions;
-import com.cloudera.impala.thrift.TScanRange;
-import com.cloudera.impala.thrift.TScanRangeLocation;
-import com.cloudera.impala.thrift.TScanRangeLocations;
-import com.cloudera.impala.thrift.TStatus;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-/**
- * Scan of a table provided by an external data source.
- */
-public class DataSourceScanNode extends ScanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(DataSourceScanNode.class);
-  private final TupleDescriptor desc_;
-  private final DataSourceTable table_;
-
-  // The converted conjuncts_ that were accepted by the data source. A conjunct can
-  // be converted if it contains only disjunctive predicates of the form
-  // <slotref> <op> <constant>.
-  private List<List<TBinaryPredicate>> acceptedPredicates_;
-
-  // The conjuncts that were accepted by the data source and removed from conjuncts_ in
-  // removeAcceptedConjuncts(). Only used in getNodeExplainString() to print the
-  // conjuncts applied by the data source.
-  private List<Expr> acceptedConjuncts_;
-
-  // The number of rows estimate as returned by prepare().
-  private long numRowsEstimate_;
-
-  public DataSourceScanNode(PlanNodeId id, TupleDescriptor desc) {
-    super(id, desc, "SCAN DATA SOURCE");
-    desc_ = desc;
-    table_ = (DataSourceTable) desc_.getTable();
-    acceptedPredicates_ = null;
-    acceptedConjuncts_ = null;
-  }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    checkForSupportedFileFormats();
-    assignConjuncts(analyzer);
-    analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
-    prepareDataSource();
-    conjuncts_ = orderConjunctsByCost(conjuncts_);
-    computeStats(analyzer);
-    // materialize slots in remaining conjuncts_
-    analyzer.materializeSlots(conjuncts_);
-    computeMemLayout(analyzer);
-    computeScanRangeLocations(analyzer);
-  }
-
-  /**
-   * Returns a thrift TColumnValue representing the literal from a binary
-   * predicate, or null if the type cannot be represented.
-   */
-  public static TColumnValue literalToColumnValue(LiteralExpr expr) {
-    switch (expr.getType().getPrimitiveType()) {
-      case BOOLEAN:
-        return new TColumnValue().setBool_val(((BoolLiteral) expr).getValue());
-      case TINYINT:
-        return new TColumnValue().setByte_val(
-            (byte) ((NumericLiteral) expr).getLongValue());
-      case SMALLINT:
-        return new TColumnValue().setShort_val(
-            (short) ((NumericLiteral) expr).getLongValue());
-      case INT:
-        return new TColumnValue().setInt_val(
-            (int) ((NumericLiteral) expr).getLongValue());
-      case BIGINT:
-        return new TColumnValue().setLong_val(((NumericLiteral) expr).getLongValue());
-      case FLOAT:
-      case DOUBLE:
-        return new TColumnValue().setDouble_val(
-            ((NumericLiteral) expr).getDoubleValue());
-      case STRING:
-        return new TColumnValue().setString_val(((StringLiteral) expr).getValue());
-      case DECIMAL:
-      case DATE:
-      case DATETIME:
-      case TIMESTAMP:
-        // TODO: we support DECIMAL and TIMESTAMP but no way to specify it in SQL.
-        return null;
-      default:
-        Preconditions.checkState(false);
-        return null;
-    }
-  }
-
-  /**
-   * Calls prepare() on the data source to determine accepted predicates and get
-   * stats. The accepted predicates are moved from conjuncts_ into acceptedConjuncts_
-   * and the associated TBinaryPredicates are set in acceptedPredicates_.
-   */
-  private void prepareDataSource() throws InternalException {
-    // Binary predicates that will be offered to the data source.
-    List<List<TBinaryPredicate>> offeredPredicates = Lists.newArrayList();
-    // The index into conjuncts_ for each element in offeredPredicates.
-    List<Integer> conjunctsIdx = Lists.newArrayList();
-    for (int i = 0; i < conjuncts_.size(); ++i) {
-      Expr conjunct = conjuncts_.get(i);
-      List<TBinaryPredicate> disjuncts = getDisjuncts(conjunct);
-      if (disjuncts != null) {
-        offeredPredicates.add(disjuncts);
-        conjunctsIdx.add(i);
-      }
-    }
-
-    String hdfsLocation = table_.getDataSource().getHdfs_location();
-    TCacheJarResult cacheResult = FeSupport.CacheJar(hdfsLocation);
-    TStatus cacheJarStatus = cacheResult.getStatus();
-    if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
-      throw new InternalException(String.format(
-          "Unable to cache data source library at location '%s'. Check that the file " +
-          "exists and is readable. Message: %s",
-          hdfsLocation, Joiner.on("\n").join(cacheJarStatus.getError_msgs())));
-    }
-    String localPath = cacheResult.getLocal_path();
-    String className = table_.getDataSource().getClass_name();
-    String apiVersion = table_.getDataSource().getApi_version();
-    TPrepareResult prepareResult;
-    TStatus prepareStatus;
-    try {
-      ExternalDataSourceExecutor executor = new ExternalDataSourceExecutor(
-          localPath, className, apiVersion, table_.getInitString());
-      TPrepareParams prepareParams = new TPrepareParams();
-      prepareParams.setInit_string(table_.getInitString());
-      prepareParams.setPredicates(offeredPredicates);
-      // TODO: Include DB (i.e. getFullName())?
-      prepareParams.setTable_name(table_.getName());
-      prepareResult = executor.prepare(prepareParams);
-      prepareStatus = prepareResult.getStatus();
-    } catch (Exception e) {
-      throw new InternalException(String.format(
-          "Error calling prepare() on data source %s",
-          DataSource.debugString(table_.getDataSource())), e);
-    }
-    if (prepareStatus.getStatus_code() != TErrorCode.OK) {
-      throw new InternalException(String.format(
-          "Data source %s returned an error from prepare(): %s",
-          DataSource.debugString(table_.getDataSource()),
-          Joiner.on("\n").join(prepareStatus.getError_msgs())));
-    }
-
-    numRowsEstimate_ = prepareResult.getNum_rows_estimate();
-    acceptedPredicates_ = Lists.newArrayList();
-    List<Integer> acceptedPredicatesIdx = prepareResult.isSetAccepted_conjuncts() ?
-        prepareResult.getAccepted_conjuncts() : ImmutableList.<Integer>of();
-    for (Integer acceptedIdx: acceptedPredicatesIdx) {
-      acceptedPredicates_.add(offeredPredicates.get(acceptedIdx));
-    }
-    removeAcceptedConjuncts(acceptedPredicatesIdx, conjunctsIdx);
-  }
-
-  /**
-   * Converts the conjunct to a list of TBinaryPredicates if it contains only
-   * disjunctive predicates of the form {slotref} {op} {constant} that can be represented
-   * by TBinaryPredicates. If the Expr cannot be converted, null is returned.
-   * TODO: Move this to Expr.
-   */
-  private List<TBinaryPredicate> getDisjuncts(Expr conjunct) {
-    List<TBinaryPredicate> disjuncts = Lists.newArrayList();
-    if (getDisjunctsHelper(conjunct, disjuncts)) return disjuncts;
-    return null;
-  }
-
-  // Recursive helper method for getDisjuncts().
-  private boolean getDisjunctsHelper(Expr conjunct,
-      List<TBinaryPredicate> predicates) {
-    if (conjunct instanceof BinaryPredicate) {
-      if (conjunct.getChildren().size() != 2) return false;
-      SlotRef slotRef = null;
-      LiteralExpr literalExpr = null;
-      TComparisonOp op = null;
-      if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) &&
-          (conjunct.getChild(1) instanceof LiteralExpr)) {
-        slotRef = conjunct.getChild(0).unwrapSlotRef(true);
-        literalExpr = (LiteralExpr) conjunct.getChild(1);
-        op = ((BinaryPredicate) conjunct).getOp().getThriftOp();
-      } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) &&
-                 (conjunct.getChild(0) instanceof LiteralExpr)) {
-        slotRef = conjunct.getChild(1).unwrapSlotRef(true);
-        literalExpr = (LiteralExpr) conjunct.getChild(0);
-        op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp();
-      } else {
-        return false;
-      }
-
-      TColumnValue val = literalToColumnValue(literalExpr);
-      if (val == null) return false; // false if unsupported type, e.g.
-
-      String colName = Joiner.on(".").join(slotRef.getResolvedPath().getRawPath());
-      TColumnDesc col = new TColumnDesc().setName(colName).setType(
-          slotRef.getType().toThrift());
-      predicates.add(new TBinaryPredicate().setCol(col).setOp(op).setValue(val));
-      return true;
-    } else if (conjunct instanceof CompoundPredicate) {
-      CompoundPredicate compoundPredicate = ((CompoundPredicate) conjunct);
-      if (compoundPredicate.getOp() != CompoundPredicate.Operator.OR) return false;
-      if (!getDisjunctsHelper(conjunct.getChild(0), predicates)) return false;
-      if (!getDisjunctsHelper(conjunct.getChild(1), predicates)) return false;
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    super.computeStats(analyzer);
-    inputCardinality_ = numRowsEstimate_;
-    cardinality_ = numRowsEstimate_;
-    cardinality_ *= computeSelectivity();
-    cardinality_ = Math.max(1, cardinality_);
-    cardinality_ = capAtLimit(cardinality_);
-
-    LOG.debug("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_));
-
-    numNodes_ = table_.getNumNodes();
-    LOG.debug("computeStats DataSourceScan: #nodes=" + Integer.toString(numNodes_));
-  }
-
-  @Override
-  protected String debugString() {
-    return Objects.toStringHelper(this)
-        .add("tid", desc_.getId().asInt())
-        .add("tblName", table_.getFullName())
-        .add("dataSource", DataSource.debugString(table_.getDataSource()))
-        .add("initString", table_.getInitString())
-        .addValue(super.debugString())
-        .toString();
-  }
-
-  /**
-   * Removes the predicates from conjuncts_ that were accepted by the data source.
-   * Stores the accepted conjuncts in acceptedConjuncts_.
-   */
-  private void removeAcceptedConjuncts(List<Integer> acceptedPredicatesIdx,
-      List<Integer> conjunctsIdx) {
-    acceptedConjuncts_ = Lists.newArrayList();
-    // Because conjuncts_ is modified in place using positional indexes from
-    // conjunctsIdx, we remove the accepted predicates in reverse order.
-    for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) {
-      int acceptedPredIdx = acceptedPredicatesIdx.get(i);
-      int conjunctIdx = conjunctsIdx.get(acceptedPredIdx);
-      acceptedConjuncts_.add(conjuncts_.remove(conjunctIdx));
-    }
-    // Returns a view of the list in the original order as we will print these
-    // in the explain string and it's convenient to have predicates printed
-    // in the same order that they're specified.
-    acceptedConjuncts_ = Lists.reverse(acceptedConjuncts_);
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    Preconditions.checkNotNull(acceptedPredicates_);
-    msg.node_type = TPlanNodeType.DATA_SOURCE_NODE;
-    msg.data_source_node = new TDataSourceScanNode(desc_.getId().asInt(),
-        table_.getDataSource(), table_.getInitString(), acceptedPredicates_);
-  }
-
-  /**
-   * Create a single scan range for the localhost.
-   */
-  private void computeScanRangeLocations(Analyzer analyzer) {
-    // TODO: Does the port matter?
-    TNetworkAddress networkAddress = addressToTNetworkAddress("localhost:12345");
-    Integer hostIndex = analyzer.getHostIndex().getIndex(networkAddress);
-    scanRanges_ = Lists.newArrayList(
-        new TScanRangeLocations(
-            new TScanRange(), Lists.newArrayList(new TScanRangeLocation(hostIndex))));
-  }
-
-  @Override
-  public void computeCosts(TQueryOptions queryOptions) {
-    // TODO: What's a good estimate of memory consumption?
-    perHostMemCost_ = 1024L * 1024L * 1024L;
-  }
-
-  /**
-   * Returns the per-host upper bound of memory that any number of concurrent scan nodes
-   * will use. Used for estimating the per-host memory requirement of queries.
-   */
-  public static long getPerHostMemUpperBound() {
-    // TODO: What's a good estimate of memory consumption?
-    return 1024L * 1024L * 1024L;
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    String aliasStr = "";
-    if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias()) &&
-        !table_.getName().equalsIgnoreCase(desc_.getAlias())) {
-      aliasStr = " " + desc_.getAlias();
-    }
-    output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(),
-        displayName_, table_.getFullName(), aliasStr));
-
-    if (!acceptedConjuncts_.isEmpty()) {
-      output.append(prefix + "data source predicates: " +
-          getExplainString(acceptedConjuncts_) + "\n");
-    }
-    if (!conjuncts_.isEmpty()) {
-      output.append(prefix + "predicates: " + getExplainString(conjuncts_) + "\n");
-    }
-
-    // Add table and column stats in verbose mode.
-    if (detailLevel == TExplainLevel.VERBOSE) {
-      output.append(getStatsExplainString(prefix, detailLevel));
-      output.append("\n");
-    }
-    return output.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java
deleted file mode 100644
index 514a791..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/DataStreamSink.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import com.cloudera.impala.thrift.TDataSink;
-import com.cloudera.impala.thrift.TDataSinkType;
-import com.cloudera.impala.thrift.TDataStreamSink;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.google.common.base.Preconditions;
-
-/**
- * Data sink that forwards data to an exchange node.
- */
-public class DataStreamSink extends DataSink {
-  private final ExchangeNode exchNode_;
-  private final DataPartition outputPartition_;
-
-  public DataStreamSink(ExchangeNode exchNode, DataPartition partition) {
-    Preconditions.checkNotNull(exchNode);
-    Preconditions.checkNotNull(partition);
-    exchNode_ = exchNode;
-    outputPartition_ = partition;
-  }
-
-  @Override
-  public String getExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(
-        String.format("%sDATASTREAM SINK [FRAGMENT=%s, EXCHANGE=%s, %s]",
-        prefix, exchNode_.getFragment().getId().toString(),
-        exchNode_.getId().toString(), exchNode_.getDisplayLabelDetail()));
-    return output.toString();
-  }
-
-  @Override
-  protected TDataSink toThrift() {
-    TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
-    TDataStreamSink tStreamSink =
-        new TDataStreamSink(exchNode_.getId().asInt(), outputPartition_.toThrift());
-    result.setStream_sink(tStreamSink);
-    return result;
-  }
-
-  public DataPartition getOutputPartition() { return outputPartition_; }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
deleted file mode 100644
index b38b018..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.AggregateInfo;
-import com.cloudera.impala.analysis.AnalysisContext;
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.InsertStmt;
-import com.cloudera.impala.analysis.JoinOperator;
-import com.cloudera.impala.analysis.QueryStmt;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.common.InternalException;
-import com.cloudera.impala.planner.JoinNode.DistributionMode;
-import com.cloudera.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-
-/**
- * The distributed planner is responsible for creating an executable, distributed plan
- * from a single-node plan that can be sent to the backend.
- */
-public class DistributedPlanner {
-  private final static Logger LOG = LoggerFactory.getLogger(DistributedPlanner.class);
-
-  private final PlannerContext ctx_;
-
-  public DistributedPlanner(PlannerContext ctx) {
-    ctx_ = ctx;
-  }
-
-  /**
-   * Create plan fragments for a single-node plan considering a set of execution options.
-   * The fragments are returned in a list such that element i of that list can
-   * only consume output of the following fragments j > i.
-   *
-   * TODO: take data partition of the plan fragments into account; in particular,
-   * coordinate between hash partitioning for aggregation and hash partitioning
-   * for analytic computation more generally than what createQueryPlan() does
-   * right now (the coordination only happens if the same select block does both
-   * the aggregation and analytic computation).
-   */
-  public ArrayList<PlanFragment> createPlanFragments(
-      PlanNode singleNodePlan) throws ImpalaException {
-    Preconditions.checkState(!ctx_.isSingleNodeExec());
-    AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
-    QueryStmt queryStmt = ctx_.getQueryStmt();
-    ArrayList<PlanFragment> fragments = Lists.newArrayList();
-    // For inserts or CTAS, unless there is a limit, leave the root fragment
-    // partitioned, otherwise merge everything into a single coordinator fragment,
-    // so we can pass it back to the client.
-    boolean isPartitioned = false;
-    if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()
-        || analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt())
-        && !singleNodePlan.hasLimit()) {
-      Preconditions.checkState(!queryStmt.hasOffset());
-      isPartitioned = true;
-    }
-    LOG.debug("create plan fragments");
-    long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
-    LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
-    createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
-    return fragments;
-  }
-
-  /**
-   * Return plan fragment that produces result of 'root'; recursively creates
-   * all input fragments to the returned fragment.
-   * If a new fragment is created, it is appended to 'fragments', so that
-   * each fragment is preceded by those from which it consumes the output.
-   * If 'isPartitioned' is false, the returned fragment is unpartitioned;
-   * otherwise it may be partitioned, depending on whether its inputs are
-   * partitioned; the partition function is derived from the inputs.
-   */
-  private PlanFragment createPlanFragments(
-      PlanNode root, boolean isPartitioned,
-      long perNodeMemLimit, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    ArrayList<PlanFragment> childFragments = Lists.newArrayList();
-    for (PlanNode child: root.getChildren()) {
-      // allow child fragments to be partitioned, unless they contain a limit clause
-      // (the result set with the limit constraint needs to be computed centrally);
-      // merge later if needed
-      boolean childIsPartitioned = !child.hasLimit();
-      // Do not fragment the subplan of a SubplanNode since it is executed locally.
-      if (root instanceof SubplanNode && child == root.getChild(1)) continue;
-      childFragments.add(
-          createPlanFragments(
-            child, childIsPartitioned, perNodeMemLimit, fragments));
-    }
-
-    PlanFragment result = null;
-    if (root instanceof ScanNode) {
-      result = createScanFragment(root);
-      fragments.add(result);
-    } else if (root instanceof HashJoinNode) {
-      Preconditions.checkState(childFragments.size() == 2);
-      result = createHashJoinFragment(
-          (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
-          perNodeMemLimit, fragments);
-    } else if (root instanceof NestedLoopJoinNode) {
-      Preconditions.checkState(childFragments.size() == 2);
-      result = createNestedLoopJoinFragment(
-          (NestedLoopJoinNode) root, childFragments.get(1), childFragments.get(0),
-          perNodeMemLimit, fragments);
-    } else if (root instanceof SubplanNode) {
-      Preconditions.checkState(childFragments.size() == 1);
-      result = createSubplanNodeFragment((SubplanNode) root, childFragments.get(0));
-    } else if (root instanceof SelectNode) {
-      result = createSelectNodeFragment((SelectNode) root, childFragments);
-    } else if (root instanceof UnionNode) {
-      result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
-    } else if (root instanceof AggregationNode) {
-      result = createAggregationFragment(
-          (AggregationNode) root, childFragments.get(0), fragments);
-    } else if (root instanceof SortNode) {
-      if (((SortNode) root).isAnalyticSort()) {
-        // don't parallelize this like a regular SortNode
-        result = createAnalyticFragment(
-            root, childFragments.get(0), fragments);
-      } else {
-        result = createOrderByFragment(
-            (SortNode) root, childFragments.get(0), fragments);
-      }
-    } else if (root instanceof AnalyticEvalNode) {
-      result = createAnalyticFragment(root, childFragments.get(0), fragments);
-    } else if (root instanceof EmptySetNode) {
-      result = new PlanFragment(
-          ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
-    } else {
-      throw new InternalException(
-          "Cannot create plan fragment for this node type: " + root.getExplainString());
-    }
-    // move 'result' to end, it depends on all of its children
-    fragments.remove(result);
-    fragments.add(result);
-
-    if (!isPartitioned && result.isPartitioned()) {
-      result = createMergeFragment(result);
-      fragments.add(result);
-    }
-
-    return result;
-  }
-
-  /**
-   * Returns the product of the distinct value estimates of the individual exprs
-   * or -1 if any of them doesn't have a distinct value estimate.
-   */
-  private long getNumDistinctValues(List<Expr> exprs) {
-    long result = 1;
-    for (Expr expr: exprs) {
-      result *= expr.getNumDistinctValues();
-      if (result < 0) return -1;
-    }
-    return result;
-  }
-
-  /**
-   * Decides whether to repartition the output of 'inputFragment' before feeding its
-   * data into the table sink of the given 'insertStmt'. The decision obeys the
-   * shuffle/noshuffle plan hints if present. Otherwise, returns a plan fragment that
-   * partitions the output of 'inputFragment' on the partition exprs of 'insertStmt',
-   * unless the expected number of partitions is less than the number of nodes on which
-   * inputFragment runs, or the target table is unpartitioned.
-   * For inserts into unpartitioned tables or inserts with only constant partition exprs,
-   * the shuffle hint leads to a plan that merges all rows at the coordinator where
-   * the table sink is executed.
-   * If this functions ends up creating a new fragment, appends that to 'fragments'.
-   */
-  public PlanFragment createInsertFragment(
-      PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer,
-      ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    if (insertStmt.hasNoShuffleHint()) return inputFragment;
-
-    List<Expr> partitionExprs = Lists.newArrayList(insertStmt.getPartitionKeyExprs());
-    // Ignore constants for the sake of partitioning.
-    Expr.removeConstants(partitionExprs);
-
-    // Do nothing if the input fragment is already appropriately partitioned.
-    DataPartition inputPartition = inputFragment.getDataPartition();
-    if (!partitionExprs.isEmpty() &&
-        analyzer.equivSets(inputPartition.getPartitionExprs(), partitionExprs)) {
-      return inputFragment;
-    }
-
-    // Make a cost-based decision only if no user hint was supplied.
-    if (!insertStmt.hasShuffleHint()) {
-      // If the existing partition exprs are a subset of the table partition exprs, check
-      // if it is distributed across all nodes. If so, don't repartition.
-      if (Expr.isSubset(inputPartition.getPartitionExprs(), partitionExprs)) {
-        long numPartitions = getNumDistinctValues(inputPartition.getPartitionExprs());
-        if (numPartitions >= inputFragment.getNumNodes()) return inputFragment;
-      }
-
-      // Don't repartition if we know we have fewer partitions than nodes
-      // (ie, default to repartitioning if col stats are missing).
-      // TODO: We want to repartition if the resulting files would otherwise
-      // be very small (less than some reasonable multiple of the recommended block size).
-      // In order to do that, we need to come up with an estimate of the avg row size
-      // in the particular file format of the output table/partition.
-      // We should always know on how many nodes our input is running.
-      long numPartitions = getNumDistinctValues(partitionExprs);
-      Preconditions.checkState(inputFragment.getNumNodes() != -1);
-      if (numPartitions > 0 && numPartitions <= inputFragment.getNumNodes()) {
-        return inputFragment;
-      }
-    }
-
-    ExchangeNode exchNode =
-        new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
-    exchNode.init(analyzer);
-    Preconditions.checkState(exchNode.hasValidStats());
-    DataPartition partition;
-    if (partitionExprs.isEmpty()) {
-      partition = DataPartition.UNPARTITIONED;
-    } else {
-      partition = DataPartition.hashPartitioned(partitionExprs);
-    }
-    PlanFragment fragment =
-        new PlanFragment(ctx_.getNextFragmentId(), exchNode, partition);
-    inputFragment.setDestination(exchNode);
-    inputFragment.setOutputPartition(partition);
-    fragments.add(fragment);
-    return fragment;
-  }
-
-  /**
-   * Return unpartitioned fragment that merges the input fragment's output via
-   * an ExchangeNode.
-   * Requires that input fragment be partitioned.
-   */
-  private PlanFragment createMergeFragment(PlanFragment inputFragment)
-      throws ImpalaException {
-    Preconditions.checkState(inputFragment.isPartitioned());
-    ExchangeNode mergePlan =
-        new ExchangeNode(ctx_.getNextNodeId(), inputFragment.getPlanRoot());
-    mergePlan.init(ctx_.getRootAnalyzer());
-    Preconditions.checkState(mergePlan.hasValidStats());
-    PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
-        DataPartition.UNPARTITIONED);
-    inputFragment.setDestination(mergePlan);
-    return fragment;
-  }
-
-  /**
-   * Create new randomly-partitioned fragment containing a single scan node.
-   * TODO: take bucketing into account to produce a naturally hash-partitioned
-   * fragment
-   * TODO: hbase scans are range-partitioned on the row key
-   */
-  private PlanFragment createScanFragment(PlanNode node) {
-    return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.RANDOM);
-  }
-
-  /**
-   * Adds the SubplanNode as the new plan root to the child fragment and returns
-   * the child fragment.
-   */
-  private PlanFragment createSubplanNodeFragment(SubplanNode node,
-      PlanFragment childFragment) {
-    node.setChild(0, childFragment.getPlanRoot());
-    childFragment.setPlanRoot(node);
-    return childFragment;
-  }
-
-  /**
-   * Modifies the leftChildFragment to execute a cross join. The right child input is
-   * provided by an ExchangeNode, which is the destination of the rightChildFragment's
-   * output.
-   */
-  private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node,
-      PlanFragment rightChildFragment, PlanFragment leftChildFragment,
-      long perNodeMemLimit, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    node.setDistributionMode(DistributionMode.BROADCAST);
-    node.setChild(0, leftChildFragment.getPlanRoot());
-    connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
-    leftChildFragment.setPlanRoot(node);
-    return leftChildFragment;
-  }
-
-  /**
-   * Helper function to produce a partitioning hash-join fragment
-   */
-  private PlanFragment createPartitionedHashJoinFragment(HashJoinNode node,
-      Analyzer analyzer, boolean lhsHasCompatPartition, boolean rhsHasCompatPartition,
-      PlanFragment leftChildFragment, PlanFragment rightChildFragment,
-      List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
-      ArrayList<PlanFragment> fragments) throws ImpalaException {
-    node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
-    // The lhs and rhs input fragments are already partitioned on the join exprs.
-    // Combine the lhs/rhs input fragments into leftChildFragment by placing the join
-    // node into leftChildFragment and setting its lhs/rhs children to the plan root of
-    // the lhs/rhs child fragment, respectively. No new child fragments or exchanges
-    // are created, and the rhs fragment is removed.
-    // TODO: Relax the isCompatPartition() check below. The check is conservative and
-    // may reject partitions that could be made physically compatible. Fix this by
-    // removing equivalent duplicates from partition exprs and impose a canonical order
-    // on partition exprs (both using the canonical equivalence class representatives).
-    if (lhsHasCompatPartition
-        && rhsHasCompatPartition
-        && isCompatPartition(
-            leftChildFragment.getDataPartition(),
-            rightChildFragment.getDataPartition(),
-            lhsJoinExprs, rhsJoinExprs, analyzer)) {
-      node.setChild(0, leftChildFragment.getPlanRoot());
-      node.setChild(1, rightChildFragment.getPlanRoot());
-      // fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child
-      leftChildFragment.setFragmentInPlanTree(node.getChild(1));
-      // Relocate input fragments of rightChildFragment to leftChildFragment.
-      for (PlanFragment rhsInput: rightChildFragment.getChildren()) {
-        leftChildFragment.getChildren().add(rhsInput);
-      }
-      // Remove right fragment because its plan tree has been merged into leftFragment.
-      fragments.remove(rightChildFragment);
-      leftChildFragment.setPlanRoot(node);
-      return leftChildFragment;
-    }
-
-    // The lhs input fragment is already partitioned on the join exprs.
-    // Make the HashJoin the new root of leftChildFragment and set the join's
-    // first child to the lhs plan root. The second child of the join is an
-    // ExchangeNode that is fed by the rhsInputFragment whose sink repartitions
-    // its data by the rhs join exprs.
-    DataPartition rhsJoinPartition = null;
-    if (lhsHasCompatPartition) {
-      rhsJoinPartition = getCompatPartition(lhsJoinExprs,
-          leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
-      if (rhsJoinPartition != null) {
-        node.setChild(0, leftChildFragment.getPlanRoot());
-        connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
-        rightChildFragment.setOutputPartition(rhsJoinPartition);
-        leftChildFragment.setPlanRoot(node);
-        return leftChildFragment;
-      }
-    }
-
-    // Same as above but with rhs and lhs reversed.
-    DataPartition lhsJoinPartition = null;
-    if (rhsHasCompatPartition) {
-      lhsJoinPartition = getCompatPartition(rhsJoinExprs,
-          rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
-      if (lhsJoinPartition != null) {
-        node.setChild(1, rightChildFragment.getPlanRoot());
-        connectChildFragment(node, 0, rightChildFragment, leftChildFragment);
-        leftChildFragment.setOutputPartition(lhsJoinPartition);
-        rightChildFragment.setPlanRoot(node);
-        return rightChildFragment;
-      }
-    }
-
-    Preconditions.checkState(lhsJoinPartition == null);
-    Preconditions.checkState(rhsJoinPartition == null);
-    lhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(lhsJoinExprs));
-    rhsJoinPartition = DataPartition.hashPartitioned(Expr.cloneList(rhsJoinExprs));
-
-    // Neither lhs nor rhs are already partitioned on the join exprs.
-    // Create a new parent fragment containing a HashJoin node with two
-    // ExchangeNodes as inputs; the latter are the destinations of the
-    // left- and rightChildFragments, which now partition their output
-    // on their respective join exprs.
-    // The new fragment is hash-partitioned on the lhs input join exprs.
-    ExchangeNode lhsExchange =
-        new ExchangeNode(ctx_.getNextNodeId(), leftChildFragment.getPlanRoot());
-    lhsExchange.computeStats(null);
-    node.setChild(0, lhsExchange);
-    ExchangeNode rhsExchange =
-        new ExchangeNode(ctx_.getNextNodeId(), rightChildFragment.getPlanRoot());
-    rhsExchange.computeStats(null);
-    node.setChild(1, rhsExchange);
-
-    // Connect the child fragments in a new fragment, and set the data partition
-    // of the new fragment and its child fragments.
-    PlanFragment joinFragment =
-        new PlanFragment(ctx_.getNextFragmentId(), node, lhsJoinPartition);
-    leftChildFragment.setDestination(lhsExchange);
-    leftChildFragment.setOutputPartition(lhsJoinPartition);
-    rightChildFragment.setDestination(rhsExchange);
-    rightChildFragment.setOutputPartition(rhsJoinPartition);
-    return joinFragment;
-  }
-
-  /**
-   * Creates either a broadcast join or a repartitioning join, depending on the
-   * expected cost.
-   * If any of the inputs to the cost computation is unknown, it assumes the cost
-   * will be 0. Costs being equal, it'll favor partitioned over broadcast joins.
-   * If perNodeMemLimit > 0 and the size of the hash table for a broadcast join is
-   * expected to exceed that mem limit, switches to partitioned join instead.
-   * TODO: revisit the choice of broadcast as the default
-   * TODO: don't create a broadcast join if we already anticipate that this will
-   * exceed the query's memory budget.
-   */
-  private PlanFragment createHashJoinFragment(
-      HashJoinNode node, PlanFragment rightChildFragment,
-      PlanFragment leftChildFragment, long perNodeMemLimit,
-      ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    // For both join types, the total cost is calculated as the amount of data
-    // sent over the network, plus the amount of data inserted into the hash table.
-    // broadcast: send the rightChildFragment's output to each node executing
-    // the leftChildFragment, and build a hash table with it on each node.
-    Analyzer analyzer = ctx_.getRootAnalyzer();
-    PlanNode rhsTree = rightChildFragment.getPlanRoot();
-    long rhsDataSize = 0;
-    long broadcastCost = Long.MAX_VALUE;
-    if (rhsTree.getCardinality() != -1) {
-      rhsDataSize = Math.round(
-          rhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(rhsTree));
-      if (leftChildFragment.getNumNodes() != -1) {
-        broadcastCost = 2 * rhsDataSize * leftChildFragment.getNumNodes();
-      }
-    }
-    LOG.debug("broadcast: cost=" + Long.toString(broadcastCost));
-    LOG.debug("card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
-        + Float.toString(rhsTree.getAvgRowSize()) + " #nodes="
-        + Integer.toString(leftChildFragment.getNumNodes()));
-
-    // repartition: both left- and rightChildFragment are partitioned on the
-    // join exprs, and a hash table is built with the rightChildFragment's output.
-    PlanNode lhsTree = leftChildFragment.getPlanRoot();
-    long partitionCost = Long.MAX_VALUE;
-    List<Expr> lhsJoinExprs = Lists.newArrayList();
-    List<Expr> rhsJoinExprs = Lists.newArrayList();
-    for (Expr joinConjunct: node.getEqJoinConjuncts()) {
-      // no remapping necessary
-      lhsJoinExprs.add(joinConjunct.getChild(0).clone());
-      rhsJoinExprs.add(joinConjunct.getChild(1).clone());
-    }
-    boolean lhsHasCompatPartition = false;
-    boolean rhsHasCompatPartition = false;
-    if (lhsTree.getCardinality() != -1 && rhsTree.getCardinality() != -1) {
-      lhsHasCompatPartition = analyzer.equivSets(lhsJoinExprs,
-          leftChildFragment.getDataPartition().getPartitionExprs());
-      rhsHasCompatPartition = analyzer.equivSets(rhsJoinExprs,
-          rightChildFragment.getDataPartition().getPartitionExprs());
-
-      double lhsNetworkCost = (lhsHasCompatPartition) ? 0.0 :
-        Math.round(
-            lhsTree.getCardinality() * ExchangeNode.getAvgSerializedRowSize(lhsTree));
-      double rhsNetworkCost = (rhsHasCompatPartition) ? 0.0 : rhsDataSize;
-      partitionCost = Math.round(lhsNetworkCost + rhsNetworkCost + rhsDataSize);
-    }
-    LOG.debug("partition: cost=" + Long.toString(partitionCost));
-    LOG.debug("lhs card=" + Long.toString(lhsTree.getCardinality()) + " row_size="
-        + Float.toString(lhsTree.getAvgRowSize()));
-    LOG.debug("rhs card=" + Long.toString(rhsTree.getCardinality()) + " row_size="
-        + Float.toString(rhsTree.getAvgRowSize()));
-    LOG.debug(rhsTree.getExplainString());
-
-    boolean doBroadcast = false;
-    // we do a broadcast join if
-    // - we're explicitly told to do so
-    // - or if it's cheaper and we weren't explicitly told to do a partitioned join
-    // - and we're not doing a full outer or right outer/semi join (those require the
-    //   left-hand side to be partitioned for correctness)
-    // - and the expected size of the hash tbl doesn't exceed perNodeMemLimit
-    // - or we are doing a null-aware left anti join (broadcast is required for
-    //   correctness)
-    // we do a "<=" comparison of the costs so that we default to broadcast joins if
-    // we're unable to estimate the cost
-    if ((node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
-        && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN
-        && node.getJoinOp() != JoinOperator.RIGHT_SEMI_JOIN
-        && node.getJoinOp() != JoinOperator.RIGHT_ANTI_JOIN
-        // a broadcast join hint overides the check to see if the hash table
-        // size is less than the pernode memlimit
-        && (node.getDistributionModeHint() == DistributionMode.BROADCAST
-            || perNodeMemLimit == 0
-            || Math.round(rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD)
-                <= perNodeMemLimit)
-        // a broadcast join hint overrides the check to see if performing a broadcast
-        // join is more costly than a partitioned join
-        && (node.getDistributionModeHint() == DistributionMode.BROADCAST
-            || (node.getDistributionModeHint() != DistributionMode.PARTITIONED
-                && broadcastCost <= partitionCost)))
-        || node.getJoinOp().isNullAwareLeftAntiJoin()) {
-      doBroadcast = true;
-    }
-
-    PlanFragment hjFragment = null;
-    if (doBroadcast) {
-      node.setDistributionMode(HashJoinNode.DistributionMode.BROADCAST);
-      // Doesn't create a new fragment, but modifies leftChildFragment to execute
-      // the join; the build input is provided by an ExchangeNode, which is the
-      // destination of the rightChildFragment's output
-      node.setChild(0, leftChildFragment.getPlanRoot());
-      connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
-      leftChildFragment.setPlanRoot(node);
-      hjFragment = leftChildFragment;
-    } else {
-      hjFragment = createPartitionedHashJoinFragment(node, analyzer,
-          lhsHasCompatPartition, rhsHasCompatPartition, leftChildFragment,
-          rightChildFragment, lhsJoinExprs, rhsJoinExprs, fragments);
-    }
-
-    for (RuntimeFilter filter: node.getRuntimeFilters()) {
-      filter.setIsBroadcast(doBroadcast);
-      filter.computeHasLocalTargets();
-      // Work around IMPALA-3450, where cardinalities might be wrong in single-node plans
-      // with UNION and LIMITs.
-      // TODO: Remove.
-      filter.computeNdvEstimate();
-    }
-    return hjFragment;
- }
-
-  /**
-   * Returns true if the lhs and rhs partitions are physically compatible for executing
-   * a partitioned join with the given lhs/rhs join exprs. Physical compatibility means
-   * that lhs/rhs exchange nodes hashing on exactly those partition expressions are
-   * guaranteed to send two rows with identical partition-expr values to the same node.
-   * The requirements for physical compatibility are:
-   * 1. Number of exprs must be the same
-   * 2. The lhs partition exprs are identical to the lhs join exprs and the rhs partition
-   *    exprs are identical to the rhs join exprs
-   * 3. Or for each expr in the lhs partition, there must be an equivalent expr in the
-   *    rhs partition at the same ordinal position within the expr list
-   * (4. The expr types must be identical, but that is enforced later in PlanFragment)
-   * Conditions 2 and 3 are similar but not the same due to outer joins, e.g., for full
-   * outer joins condition 3 can never be met, but condition 2 can.
-   * TODO: Move parts of this function into DataPartition as appropriate.
-   */
-  private boolean isCompatPartition(DataPartition lhsPartition,
-      DataPartition rhsPartition, List<Expr> lhsJoinExprs, List<Expr> rhsJoinExprs,
-      Analyzer analyzer) {
-    List<Expr> lhsPartExprs = lhsPartition.getPartitionExprs();
-    List<Expr> rhsPartExprs = rhsPartition.getPartitionExprs();
-    // 1. Sizes must be equal.
-    if (lhsPartExprs.size() != rhsPartExprs.size()) return false;
-    // 2. Lhs/rhs join exprs are identical to lhs/rhs partition exprs.
-    Preconditions.checkState(lhsJoinExprs.size() == rhsJoinExprs.size());
-    if (lhsJoinExprs.size() == lhsPartExprs.size()) {
-      if (lhsJoinExprs.equals(lhsPartExprs) && rhsJoinExprs.equals(rhsPartExprs)) {
-        return true;
-      }
-    }
-    // 3. Each lhs part expr must have an equivalent expr at the same position
-    // in the rhs part exprs.
-    for (int i = 0; i < lhsPartExprs.size(); ++i) {
-      if (!analyzer.equivExprs(lhsPartExprs.get(i), rhsPartExprs.get(i))) return false;
-    }
-    return true;
-  }
-
-  /**
-   * Returns a new data partition that is suitable for creating an exchange node to feed
-   * a partitioned hash join. The hash join is assumed to be placed in a fragment with an
-   * existing data partition that is compatible with either the lhs or rhs join exprs
-   * (srcPartition belongs to the fragment and srcJoinExprs are the compatible exprs).
-   * The returned partition uses the given joinExprs which are assumed to be the lhs or
-   * rhs join exprs, whichever srcJoinExprs are not.
-   * The returned data partition has two important properties to ensure correctness:
-   * 1. It has exactly the same number of hash exprs as the srcPartition (IMPALA-1307),
-   *    possibly by removing redundant exprs from joinExprs or adding some joinExprs
-   *    multiple times to match the srcPartition
-   * 2. The hash exprs are ordered based on their corresponding 'matches' in
-   *    the existing srcPartition (IMPALA-1324).
-   * Returns null if no compatible data partition could be constructed.
-   * TODO: Move parts of this function into DataPartition as appropriate.
-   * TODO: Make comment less operational and more semantic.
-   */
-  private DataPartition getCompatPartition(List<Expr> srcJoinExprs,
-      DataPartition srcPartition, List<Expr> joinExprs, Analyzer analyzer) {
-    Preconditions.checkState(srcPartition.isHashPartitioned());
-    List<Expr> srcPartExprs = srcPartition.getPartitionExprs();
-    List<Expr> resultPartExprs = Lists.newArrayList();
-    for (int i = 0; i < srcPartExprs.size(); ++i) {
-      for (int j = 0; j < srcJoinExprs.size(); ++j) {
-        if (analyzer.equivExprs(srcPartExprs.get(i), srcJoinExprs.get(j))) {
-          resultPartExprs.add(joinExprs.get(j).clone());
-          break;
-        }
-      }
-    }
-    if (resultPartExprs.size() != srcPartExprs.size()) return null;
-    return DataPartition.hashPartitioned(resultPartExprs);
-  }
-
-  /**
-   * Returns a new fragment with a UnionNode as its root. The data partition of the
-   * returned fragment and how the data of the child fragments is consumed depends on the
-   * data partitions of the child fragments:
-   * - All child fragments are unpartitioned or partitioned: The returned fragment has an
-   *   UNPARTITIONED or RANDOM data partition, respectively. The UnionNode absorbs the
-   *   plan trees of all child fragments.
-   * - Mixed partitioned/unpartitioned child fragments: The returned fragment is
-   *   RANDOM partitioned. The plan trees of all partitioned child fragments are absorbed
-   *   into the UnionNode. All unpartitioned child fragments are connected to the
-   *   UnionNode via a RANDOM exchange, and remain unchanged otherwise.
-   */
-  private PlanFragment createUnionNodeFragment(UnionNode unionNode,
-      ArrayList<PlanFragment> childFragments, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    Preconditions.checkState(unionNode.getChildren().size() == childFragments.size());
-
-    // A UnionNode could have no children or constant selects if all of its operands
-    // were dropped because of constant predicates that evaluated to false.
-    if (unionNode.getChildren().isEmpty()) {
-      return new PlanFragment(
-          ctx_.getNextFragmentId(), unionNode, DataPartition.UNPARTITIONED);
-    }
-
-    Preconditions.checkState(!childFragments.isEmpty());
-    int numUnpartitionedChildFragments = 0;
-    for (int i = 0; i < childFragments.size(); ++i) {
-      if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
-    }
-
-    // remove all children to avoid them being tagged with the wrong
-    // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet)
-    unionNode.clearChildren();
-
-    // If all child fragments are unpartitioned, return a single unpartitioned fragment
-    // with a UnionNode that merges all child fragments.
-    if (numUnpartitionedChildFragments == childFragments.size()) {
-      PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
-          unionNode, DataPartition.UNPARTITIONED);
-      // Absorb the plan trees of all childFragments into unionNode
-      // and fix up the fragment tree in the process.
-      for (int i = 0; i < childFragments.size(); ++i) {
-        unionNode.addChild(childFragments.get(i).getPlanRoot());
-        unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
-        unionFragment.addChildren(childFragments.get(i).getChildren());
-      }
-      unionNode.init(ctx_.getRootAnalyzer());
-      // All child fragments have been absorbed into unionFragment.
-      fragments.removeAll(childFragments);
-      return unionFragment;
-    }
-
-    // There is at least one partitioned child fragment.
-    PlanFragment unionFragment = new PlanFragment(
-        ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM);
-    for (int i = 0; i < childFragments.size(); ++i) {
-      PlanFragment childFragment = childFragments.get(i);
-      if (childFragment.isPartitioned()) {
-        // absorb the plan trees of all partitioned child fragments into unionNode
-        unionNode.addChild(childFragment.getPlanRoot());
-        unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
-        unionFragment.addChildren(childFragment.getChildren());
-        fragments.remove(childFragment);
-      } else {
-        // dummy entry for subsequent addition of the ExchangeNode
-        unionNode.addChild(null);
-        // Connect the unpartitioned child fragments to unionNode via a random exchange.
-        connectChildFragment(unionNode, i, unionFragment, childFragment);
-        childFragment.setOutputPartition(DataPartition.RANDOM);
-      }
-    }
-    unionNode.reorderOperands(ctx_.getRootAnalyzer());
-    unionNode.init(ctx_.getRootAnalyzer());
-    return unionFragment;
-  }
-
-  /**
-   * Adds the SelectNode as the new plan root to the child fragment and returns
-   * the child fragment.
-   */
-  private PlanFragment createSelectNodeFragment(SelectNode selectNode,
-      ArrayList<PlanFragment> childFragments) {
-    Preconditions.checkState(selectNode.getChildren().size() == childFragments.size());
-    PlanFragment childFragment = childFragments.get(0);
-    // set the child explicitly, an ExchangeNode might have been inserted
-    // (whereas selectNode.child[0] would point to the original child)
-    selectNode.setChild(0, childFragment.getPlanRoot());
-    childFragment.setPlanRoot(selectNode);
-    return childFragment;
-  }
-
-  /**
-   * Replace node's child at index childIdx with an ExchangeNode that receives its
-   * input from childFragment. ParentFragment contains node and the new ExchangeNode.
-   */
-  private void connectChildFragment(PlanNode node, int childIdx,
-      PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException {
-    ExchangeNode exchangeNode =
-        new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
-    exchangeNode.init(ctx_.getRootAnalyzer());
-    exchangeNode.setFragment(parentFragment);
-    node.setChild(childIdx, exchangeNode);
-    childFragment.setDestination(exchangeNode);
-  }
-
-  /**
-   * Create a new fragment containing a single ExchangeNode that consumes the output
-   * of childFragment, set the destination of childFragment to the new parent
-   * and the output partition of childFragment to that of the new parent.
-   * TODO: the output partition of a child isn't necessarily the same as the data
-   * partition of the receiving parent (if there is more materialization happening
-   * in the parent, such as during distinct aggregation). Do we care about the data
-   * partition of the parent being applicable to the *output* of the parent (it's
-   * correct for the input).
-   */
-  private PlanFragment createParentFragment(
-      PlanFragment childFragment, DataPartition parentPartition)
-      throws ImpalaException {
-    ExchangeNode exchangeNode =
-        new ExchangeNode(ctx_.getNextNodeId(), childFragment.getPlanRoot());
-    exchangeNode.init(ctx_.getRootAnalyzer());
-    PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
-        exchangeNode, parentPartition);
-    childFragment.setDestination(exchangeNode);
-    childFragment.setOutputPartition(parentPartition);
-    return parentFragment;
-  }
-
-  /**
-   * Returns a fragment that materializes the aggregation result of 'node'.
-   * If the child fragment is partitioned, the result fragment will be partitioned on
-   * the grouping exprs of 'node'.
-   * If 'node' is phase 1 of a 2-phase DISTINCT aggregation, this will simply
-   * add 'node' to the child fragment and return the child fragment; the new
-   * fragment will be created by the subsequent call of createAggregationFragment()
-   * for the phase 2 AggregationNode.
-   */
-  private PlanFragment createAggregationFragment(AggregationNode node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    if (!childFragment.isPartitioned()) {
-      // nothing to distribute; do full aggregation directly within childFragment
-      childFragment.addPlanRoot(node);
-      return childFragment;
-    }
-
-    if (node.getAggInfo().isDistinctAgg()) {
-      // 'node' is phase 1 of a DISTINCT aggregation; the actual agg fragment
-      // will get created in the next createAggregationFragment() call
-      // for the parent AggregationNode
-      childFragment.addPlanRoot(node);
-      return childFragment;
-    }
-
-    // Check if 'node' is phase 2 of a DISTINCT aggregation.
-    boolean isDistinct = node.getChild(0) instanceof AggregationNode
-          && ((AggregationNode)(node.getChild(0))).getAggInfo().isDistinctAgg();
-    if (isDistinct) {
-      return createPhase2DistinctAggregationFragment(node, childFragment, fragments);
-    } else {
-      return createMergeAggregationFragment(node, childFragment);
-    }
-  }
-
-  /**
-   * Returns a fragment that materializes the final result of an aggregation where
-   * 'childFragment' is a partitioned fragment and 'node' is not part of a distinct
-   * aggregation.
-   */
-  private PlanFragment createMergeAggregationFragment(
-      AggregationNode node, PlanFragment childFragment)
-      throws ImpalaException {
-    Preconditions.checkArgument(childFragment.isPartitioned());
-    ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
-    boolean hasGrouping = !groupingExprs.isEmpty();
-
-    DataPartition parentPartition = null;
-    if (hasGrouping) {
-      // the parent fragment is partitioned on the grouping exprs;
-      // substitute grouping exprs to reference the *output* of the agg, not the input
-      List<Expr> partitionExprs = node.getAggInfo().getPartitionExprs();
-      if (partitionExprs == null) partitionExprs = groupingExprs;
-      partitionExprs = Expr.substituteList(partitionExprs,
-          node.getAggInfo().getIntermediateSmap(), ctx_.getRootAnalyzer(), false);
-      boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs,
-            childFragment.getDataPartition().getPartitionExprs());
-      if (childHasCompatPartition) {
-        // The data is already partitioned on the required expressions, we can just do
-        // the aggregation in the child fragment without an extra merge step.
-        childFragment.addPlanRoot(node);
-        return childFragment;
-      }
-      parentPartition = DataPartition.hashPartitioned(partitionExprs);
-    } else {
-      // the parent fragment is unpartitioned
-      parentPartition = DataPartition.UNPARTITIONED;
-    }
-
-    // the original aggregation materializes the intermediate agg tuple and goes
-    // into the child fragment; merge aggregation materializes the output agg tuple
-    // and goes into a parent fragment
-    childFragment.addPlanRoot(node);
-    node.setIntermediateTuple();
-    node.setIsPreagg(ctx_);
-
-    // if there is a limit, we need to transfer it from the pre-aggregation
-    // node in the child fragment to the merge aggregation node in the parent
-    long limit = node.getLimit();
-    node.unsetLimit();
-    node.unsetNeedsFinalize();
-
-    // place a merge aggregation step in a new fragment
-    PlanFragment mergeFragment = createParentFragment(childFragment, parentPartition);
-    AggregationNode mergeAggNode = new AggregationNode(ctx_.getNextNodeId(),
-        mergeFragment.getPlanRoot(), node.getAggInfo().getMergeAggInfo());
-    mergeAggNode.init(ctx_.getRootAnalyzer());
-    mergeAggNode.setLimit(limit);
-
-    // HAVING predicates can only be evaluated after the merge agg step
-    node.transferConjuncts(mergeAggNode);
-    // Recompute stats after transferring the conjuncts_ (order is important).
-    node.computeStats(ctx_.getRootAnalyzer());
-    mergeFragment.getPlanRoot().computeStats(ctx_.getRootAnalyzer());
-    mergeAggNode.computeStats(ctx_.getRootAnalyzer());
-    // Set new plan root after updating stats.
-    mergeFragment.addPlanRoot(mergeAggNode);
-
-    return mergeFragment;
-  }
-
-  /**
-   * Returns a fragment that materialises the final result of a distinct aggregation
-   * where 'childFragment' is a partitioned fragment with the first phase aggregation
-   * as its root and 'node' is the second phase of the distinct aggregation.
-   */
-  private PlanFragment createPhase2DistinctAggregationFragment(AggregationNode node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    ArrayList<Expr> groupingExprs = node.getAggInfo().getGroupingExprs();
-    boolean hasGrouping = !groupingExprs.isEmpty();
-
-    // The first-phase aggregation node is already in the child fragment.
-    Preconditions.checkState(node.getChild(0) == childFragment.getPlanRoot());
-
-    AggregateInfo firstPhaseAggInfo = ((AggregationNode) node.getChild(0)).getAggInfo();
-    List<Expr> partitionExprs = null;
-    if (hasGrouping) {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment, hash-partitioned on grouping exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // The output partition exprs of the child are the (input) grouping exprs of the
-      // parent. The grouping exprs reference the output tuple of the 1st phase, but the
-      // partitioning happens on the intermediate tuple of the 1st phase.
-      partitionExprs = Expr.substituteList(
-          groupingExprs, firstPhaseAggInfo.getOutputToIntermediateSmap(),
-          ctx_.getRootAnalyzer(), false);
-    } else {
-      // We need to do
-      // - child fragment:
-      //   * phase-1 aggregation
-      // - merge fragment 1, hash-partitioned on distinct exprs:
-      //   * merge agg of phase 1
-      //   * phase 2 agg
-      // - merge fragment 2, unpartitioned:
-      //   * merge agg of phase 2
-      partitionExprs = Expr.substituteList(firstPhaseAggInfo.getGroupingExprs(),
-          firstPhaseAggInfo.getIntermediateSmap(), ctx_.getRootAnalyzer(), false);
-    }
-
-    PlanFragment mergeFragment = null;
-    boolean childHasCompatPartition = ctx_.getRootAnalyzer().equivSets(partitionExprs,
-        childFragment.getDataPartition().getPartitionExprs());
-    if (childHasCompatPartition) {
-      // The data is already partitioned on the required expressions, we can skip the
-      // phase 1 merge step.
-      childFragment.addPlanRoot(node);
-      mergeFragment = childFragment;
-    } else {
-      DataPartition mergePartition = DataPartition.hashPartitioned(partitionExprs);
-      // Convert the existing node to a preaggregation.
-      AggregationNode preaggNode = (AggregationNode)node.getChild(0);
-      preaggNode.setIsPreagg(ctx_);
-
-      // place a merge aggregation step for the 1st phase in a new fragment
-      mergeFragment = createParentFragment(childFragment, mergePartition);
-      AggregateInfo phase1MergeAggInfo = firstPhaseAggInfo.getMergeAggInfo();
-      AggregationNode phase1MergeAggNode =
-          new AggregationNode(ctx_.getNextNodeId(), preaggNode, phase1MergeAggInfo);
-      phase1MergeAggNode.init(ctx_.getRootAnalyzer());
-      phase1MergeAggNode.unsetNeedsFinalize();
-      phase1MergeAggNode.setIntermediateTuple();
-      mergeFragment.addPlanRoot(phase1MergeAggNode);
-
-      // the 2nd-phase aggregation consumes the output of the merge agg;
-      // if there is a limit, it had already been placed with the 2nd aggregation
-      // step (which is where it should be)
-      mergeFragment.addPlanRoot(node);
-    }
-
-    if (!hasGrouping) {
-      // place the merge aggregation of the 2nd phase in an unpartitioned fragment;
-      // add preceding merge fragment at end
-      if (mergeFragment != childFragment) fragments.add(mergeFragment);
-
-      node.unsetNeedsFinalize();
-      node.setIntermediateTuple();
-      // Any limit should be placed in the final merge aggregation node
-      long limit = node.getLimit();
-      node.unsetLimit();
-      mergeFragment = createParentFragment(mergeFragment, DataPartition.UNPARTITIONED);
-      AggregateInfo phase2MergeAggInfo = node.getAggInfo().getMergeAggInfo();
-      AggregationNode phase2MergeAggNode = new AggregationNode(ctx_.getNextNodeId(), node,
-          phase2MergeAggInfo);
-      phase2MergeAggNode.init(ctx_.getRootAnalyzer());
-      // Transfer having predicates. If hasGrouping == true, the predicates should
-      // instead be evaluated by the 2nd phase agg (the predicates are already there).
-      node.transferConjuncts(phase2MergeAggNode);
-      phase2MergeAggNode.setLimit(limit);
-      mergeFragment.addPlanRoot(phase2MergeAggNode);
-    }
-    return mergeFragment;
-  }
-
-  /**
-   * Returns a fragment that produces the output of either an AnalyticEvalNode
-   * or of the SortNode that provides the input to an AnalyticEvalNode.
-   * ('node' can be either an AnalyticEvalNode or a SortNode).
-   * The returned fragment is either partitioned on the Partition By exprs or
-   * unpartitioned in the absence of such exprs.
-   */
-  private PlanFragment createAnalyticFragment(PlanNode node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    Preconditions.checkState(
-        node instanceof SortNode || node instanceof AnalyticEvalNode);
-    if (node instanceof AnalyticEvalNode) {
-      AnalyticEvalNode analyticNode = (AnalyticEvalNode) node;
-      if (analyticNode.getPartitionExprs().isEmpty()
-          && analyticNode.getOrderByElements().isEmpty()) {
-        // no Partition-By/Order-By exprs: compute analytic exprs in single
-        // unpartitioned fragment
-        PlanFragment fragment = childFragment;
-        if (childFragment.isPartitioned()) {
-          fragment = createParentFragment(childFragment, DataPartition.UNPARTITIONED);
-        }
-        fragment.addPlanRoot(analyticNode);
-        return fragment;
-      } else {
-        childFragment.addPlanRoot(analyticNode);
-        return childFragment;
-      }
-    }
-
-    SortNode sortNode = (SortNode) node;
-    Preconditions.checkState(sortNode.isAnalyticSort());
-    PlanFragment analyticFragment = childFragment;
-    if (sortNode.getInputPartition() != null) {
-      // make sure the childFragment's output is partitioned as required by the sortNode
-      sortNode.getInputPartition().substitute(
-          childFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer());
-      if (!childFragment.getDataPartition().equals(sortNode.getInputPartition())) {
-        analyticFragment =
-            createParentFragment(childFragment, sortNode.getInputPartition());
-      }
-    }
-    analyticFragment.addPlanRoot(sortNode);
-    return analyticFragment;
-  }
-
-  /**
-   * Returns a new unpartitioned fragment that materializes the result of the given
-   * SortNode. If the child fragment is partitioned, returns a new fragment with a
-   * sort-merging exchange that merges the results of the partitioned sorts.
-   * The offset and limit are adjusted in the child and parent plan nodes to produce
-   * the correct result.
-   */
-  private PlanFragment createOrderByFragment(SortNode node,
-      PlanFragment childFragment, ArrayList<PlanFragment> fragments)
-      throws ImpalaException {
-    node.setChild(0, childFragment.getPlanRoot());
-    childFragment.addPlanRoot(node);
-    if (!childFragment.isPartitioned()) return childFragment;
-
-    // Remember original offset and limit.
-    boolean hasLimit = node.hasLimit();
-    long limit = node.getLimit();
-    long offset = node.getOffset();
-
-    // Create a new fragment for a sort-merging exchange.
-    PlanFragment mergeFragment =
-        createParentFragment(childFragment, DataPartition.UNPARTITIONED);
-    ExchangeNode exchNode = (ExchangeNode) mergeFragment.getPlanRoot();
-
-    // Set limit, offset and merge parameters in the exchange node.
-    exchNode.unsetLimit();
-    if (hasLimit) exchNode.setLimit(limit);
-    exchNode.setMergeInfo(node.getSortInfo(), offset);
-
-    // Child nodes should not process the offset. If there is a limit,
-    // the child nodes need only return (offset + limit) rows.
-    SortNode childSortNode = (SortNode) childFragment.getPlanRoot();
-    Preconditions.checkState(node == childSortNode);
-    if (hasLimit) {
-      childSortNode.unsetLimit();
-      childSortNode.setLimit(limit + offset);
-    }
-    childSortNode.setOffset(0);
-    childSortNode.computeStats(ctx_.getRootAnalyzer());
-    exchNode.computeStats(ctx_.getRootAnalyzer());
-
-    return mergeFragment;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java b/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java
deleted file mode 100644
index ed9dc70..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/EmptySetNode.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import java.util.ArrayList;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.google.common.base.Preconditions;
-
-/**
- * Node that returns an empty result set. Used for planning query blocks with a constant
- * predicate evaluating to false or a limit 0. The result set will have zero rows, but
- * the row descriptor must still include a materialized tuple so that the backend can
- * construct a valid row empty batch.
- */
-public class EmptySetNode extends PlanNode {
-  public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) {
-    super(id, tupleIds, "EMPTYSET");
-    Preconditions.checkArgument(tupleIds.size() > 0);
-  }
-
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    avgRowSize_ = 0;
-    cardinality_ = 0;
-    perHostMemCost_ = 0;
-    numNodes_ = 1;
-  }
-
-  @Override
-  public void init(Analyzer analyzer) {
-    Preconditions.checkState(conjuncts_.isEmpty());
-    // If the physical output tuple produced by an AnalyticEvalNode wasn't created
-    // the logical output tuple is returned by getMaterializedTupleIds(). It needs
-    // to be set as materialized (even though it isn't) to avoid failing precondition
-    // checks generating the thrift for slot refs that may reference this tuple.
-    for (TupleId id: tupleIds_) analyzer.getTupleDesc(id).setIsMaterialized(true);
-    computeMemLayout(analyzer);
-    computeStats(analyzer);
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    return String.format("%s%s:%s\n", prefix, id_.toString(), displayName_);
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.EMPTY_SET_NODE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b544f019/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
deleted file mode 100644
index eeef5fe..0000000
--- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
+++ /dev/null
@@ -1,204 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloudera.impala.planner;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.cloudera.impala.analysis.Analyzer;
-import com.cloudera.impala.analysis.Expr;
-import com.cloudera.impala.analysis.SortInfo;
-import com.cloudera.impala.analysis.TupleId;
-import com.cloudera.impala.common.ImpalaException;
-import com.cloudera.impala.thrift.TExchangeNode;
-import com.cloudera.impala.thrift.TExplainLevel;
-import com.cloudera.impala.thrift.TPlanNode;
-import com.cloudera.impala.thrift.TPlanNodeType;
-import com.cloudera.impala.thrift.TSortInfo;
-import com.google.common.base.Preconditions;
-
-/**
- * Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data
- * produced by its children. For each of the sending child nodes the actual data
- * transmission is performed by the DataStreamSink of the PlanFragment housing
- * that child node. Typically, an ExchangeNode only has a single sender child but,
- * e.g., for distributed union queries an ExchangeNode may have one sender child per
- * union operand.
- *
- * If a (optional) SortInfo field is set, the ExchangeNode will merge its
- * inputs on the parameters specified in the SortInfo object. It is assumed that the
- * inputs are also sorted individually on the same SortInfo parameter.
- */
-public class ExchangeNode extends PlanNode {
-  private final static Logger LOG = LoggerFactory.getLogger(ExchangeNode.class);
-
-  // The serialization overhead per tuple in bytes when sent over an exchange.
-  // Currently it accounts only for the tuple_offset entry per tuple (4B) in a
-  // BE TRowBatch. If we modify the RowBatch serialization, then we need to
-  // update this constant as well.
-  private static final double PER_TUPLE_SERIALIZATION_OVERHEAD = 4.0;
-
-  // The parameters based on which sorted input streams are merged by this
-  // exchange node. Null if this exchange does not merge sorted streams
-  private SortInfo mergeInfo_;
-
-  // Offset after which the exchange begins returning rows. Currently valid
-  // only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
-  private long offset_;
-
-  public ExchangeNode(PlanNodeId id, PlanNode input) {
-    super(id, "EXCHANGE");
-    offset_ = 0;
-    children_.add(input);
-    // Only apply the limit at the receiver if there are multiple senders.
-    if (input.getFragment().isPartitioned()) limit_ = input.limit_;
-    computeTupleIds();
-  }
-
-  @Override
-  public void computeTupleIds() {
-    clearTupleIds();
-    tupleIds_.addAll(getChild(0).getTupleIds());
-    tblRefIds_.addAll(getChild(0).getTblRefIds());
-    nullableTupleIds_.addAll(getChild(0).getNullableTupleIds());
-  }
-
-  @Override
-  public void init(Analyzer analyzer) throws ImpalaException {
-    super.init(analyzer);
-    Preconditions.checkState(conjuncts_.isEmpty());
-  }
-
-  @Override
-  public void computeStats(Analyzer analyzer) {
-    Preconditions.checkState(!children_.isEmpty(),
-        "ExchangeNode must have at least one child");
-    cardinality_ = 0;
-    for (PlanNode child: children_) {
-      if (child.getCardinality() == -1) {
-        cardinality_ = -1;
-        break;
-      }
-      cardinality_ = addCardinalities(cardinality_, child.getCardinality());
-    }
-
-    if (hasLimit()) {
-      if (cardinality_ == -1) {
-        cardinality_ = limit_;
-      } else {
-        cardinality_ = Math.min(limit_, cardinality_);
-      }
-    }
-
-    // Apply the offset correction if there's a valid cardinality
-    if (cardinality_ > -1) {
-      cardinality_ = Math.max(0, cardinality_ - offset_);
-    }
-
-    // Pick the max numNodes_ and avgRowSize_ of all children.
-    numNodes_ = Integer.MIN_VALUE;
-    avgRowSize_ = Integer.MIN_VALUE;
-    for (PlanNode child: children_) {
-      numNodes_ = Math.max(child.numNodes_, numNodes_);
-      avgRowSize_ = Math.max(child.avgRowSize_, avgRowSize_);
-    }
-  }
-
-  /**
-   * Set the parameters used to merge sorted input streams. This can be called
-   * after init().
-   */
-  public void setMergeInfo(SortInfo info, long offset) {
-    mergeInfo_ = info;
-    offset_ = offset;
-    displayName_ = "MERGING-EXCHANGE";
-  }
-
-  @Override
-  protected String getNodeExplainString(String prefix, String detailPrefix,
-      TExplainLevel detailLevel) {
-    StringBuilder output = new StringBuilder();
-    output.append(String.format("%s%s [%s]\n", prefix,
-        getDisplayLabel(), getDisplayLabelDetail()));
-
-    if (offset_ > 0) {
-      output.append(detailPrefix + "offset: ").append(offset_).append("\n");
-    }
-
-    if (mergeInfo_ != null && detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      output.append(detailPrefix + "order by: ");
-      for (int i = 0; i < mergeInfo_.getOrderingExprs().size(); ++i) {
-        if (i > 0) output.append(", ");
-        output.append(mergeInfo_.getOrderingExprs().get(i).toSql() + " ");
-        output.append(mergeInfo_.getIsAscOrder().get(i) ? "ASC" : "DESC");
-
-        Boolean nullsFirstParam = mergeInfo_.getNullsFirstParams().get(i);
-        if (nullsFirstParam != null) {
-          output.append(nullsFirstParam ? " NULLS FIRST" : " NULLS LAST");
-        }
-      }
-      output.append("\n");
-    }
-    return output.toString();
-  }
-
-  @Override
-  protected String getDisplayLabelDetail() {
-    // For the non-fragmented explain levels, print the data partition
-    // of the data stream sink that sends to this exchange node.
-    Preconditions.checkState(!children_.isEmpty());
-    DataSink sink = getChild(0).getFragment().getSink();
-    if (sink == null) return "";
-    Preconditions.checkState(sink instanceof DataStreamSink);
-    DataStreamSink streamSink = (DataStreamSink) sink;
-    if (!streamSink.getOutputPartition().isPartitioned() &&
-        fragment_.isPartitioned()) {
-      // If the output of the sink is not partitioned but the target fragment is
-      // partitioned, then the data exchange is broadcast.
-      return "BROADCAST";
-    } else {
-      return streamSink.getOutputPartition().getExplainString();
-    }
-  }
-
-  /**
-   * Returns the average size of rows produced by 'exchInput' when serialized for
-   * being sent through an exchange.
-   */
-  public static double getAvgSerializedRowSize(PlanNode exchInput) {
-    return exchInput.getAvgRowSize() +
-        (exchInput.getTupleIds().size() * PER_TUPLE_SERIALIZATION_OVERHEAD);
-  }
-
-  @Override
-  protected void toThrift(TPlanNode msg) {
-    msg.node_type = TPlanNodeType.EXCHANGE_NODE;
-    msg.exchange_node = new TExchangeNode();
-    for (TupleId tid: tupleIds_) {
-      msg.exchange_node.addToInput_row_tuples(tid.asInt());
-    }
-
-    if (mergeInfo_ != null) {
-      TSortInfo sortInfo = new TSortInfo(
-          Expr.treesToThrift(mergeInfo_.getOrderingExprs()), mergeInfo_.getIsAscOrder(),
-          mergeInfo_.getNullsFirst());
-      msg.exchange_node.setSort_info(sortInfo);
-      msg.exchange_node.setOffset(offset_);
-    }
-  }
-}