You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:09:56 UTC

[21/50] [abbrv] incubator-impala git commit: MT: Planner for multi-threaded execution

MT: Planner for multi-threaded execution

New classes:
- ParallelPlanner: creates build plans, assigns plans to cohorts
- JoinBuildSink: DataSink for plan fragments that materialize build sides
- ids for plans, hash tables, plan fragments

Tests: this adds a new test file section PARALLELPLANS and augments the tpc-h/-ds tests with
those sections.

In the interest of keeping this patch small I didn't augment other test files with that
section yet (which will happen at a later date, to cover more corner cases).

Change-Id: Ic3c34dd3f9190a131e6f03d901b4bfcd164a5174
Reviewed-on: http://gerrit.cloudera.org:8080/2846
Tested-by: Internal Jenkins
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b7d5b7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b7d5b7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b7d5b7c

Branch: refs/heads/master
Commit: 3b7d5b7c178fff8b269b6854842cc5dcf2e4d725
Parents: 8e64273
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Mon Mar 21 20:13:30 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:17:56 2016 -0700

----------------------------------------------------------------------
 be/src/service/query-options.cc                 |   12 +
 be/src/service/query-options.h                  |    5 +-
 common/thrift/DataSinks.thrift                  |   12 +-
 common/thrift/Frontend.thrift                   |    5 +
 common/thrift/ImpalaInternalService.thrift      |    6 +
 common/thrift/ImpalaService.thrift              |    3 +
 common/thrift/Planner.thrift                    |   10 +
 common/thrift/Types.thrift                      |    1 +
 .../impala/analysis/FunctionCallExpr.java       |    3 +-
 .../impala/analysis/TupleDescriptor.java        |    3 +
 .../com/cloudera/impala/common/TreeNode.java    |   68 +-
 .../com/cloudera/impala/planner/CohortId.java   |   39 +
 .../com/cloudera/impala/planner/DataSink.java   |    1 -
 .../impala/planner/DistributedPlanner.java      |   66 +-
 .../cloudera/impala/planner/ExchangeNode.java   |    3 +-
 .../cloudera/impala/planner/HashJoinNode.java   |    6 +
 .../cloudera/impala/planner/JoinBuildSink.java  |  100 +
 .../com/cloudera/impala/planner/JoinNode.java   |    6 +
 .../cloudera/impala/planner/JoinTableId.java    |   44 +
 .../impala/planner/NestedLoopJoinNode.java      |    4 +
 .../impala/planner/ParallelPlanner.java         |  202 ++
 .../cloudera/impala/planner/PlanFragment.java   |  142 +-
 .../com/cloudera/impala/planner/PlanId.java     |   39 +
 .../com/cloudera/impala/planner/PlanNode.java   |   16 +-
 .../com/cloudera/impala/planner/Planner.java    |   14 +
 .../com/cloudera/impala/service/Frontend.java   |   24 +-
 .../impala/planner/PlannerTestBase.java         |   18 +-
 .../impala/testutil/TestFileParser.java         |   18 +-
 .../queries/PlannerTest/tpcds-all.test          | 3400 ++++++++++++++++--
 .../queries/PlannerTest/tpch-all.test           | 1472 +++++++-
 30 files changed, 5187 insertions(+), 555 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8f5a682..45571bf 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -388,6 +388,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         }
         break;
       }
+      case TImpalaQueryOptions::MT_NUM_CORES: {
+        StringParser::ParseResult result;
+        const int32_t num_cores =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) {
+          return Status(
+              Substitute("$0 is not valid for mt_num_cores. Valid values are in "
+                "[0, 128].", value));
+        }
+        query_options->__set_mt_num_cores(num_cores);
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index c445658..a727c8d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION + 1);\
+      TImpalaQueryOptions::MT_NUM_CORES + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -75,7 +75,8 @@ class TQueryOptions;
   QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING)\
   QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
-  QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION);
+  QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
+  QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES);
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.
 void TQueryOptionsToMap(const TQueryOptions& query_options,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 18019af..dd381fa 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -22,7 +22,8 @@ include "Partitions.thrift"
 
 enum TDataSinkType {
   DATA_STREAM_SINK,
-  TABLE_SINK
+  TABLE_SINK,
+  JOIN_BUILD_SINK
 }
 
 enum TSinkAction {
@@ -73,6 +74,14 @@ struct TKuduTableSink {
   2: optional bool ignore_not_found_or_duplicate;
 }
 
+// Sink to create the build side of a JoinNode.
+struct TJoinBuildSink {
+  1: required Types.TJoinTableId join_table_id
+
+  // only set for hash join build sinks
+  2: required list<Exprs.TExpr> build_exprs
+}
+
 // Union type of all table sinks.
 struct TTableSink {
   1: required Types.TTableId  target_table_id
@@ -86,4 +95,5 @@ struct TDataSink {
   1: required TDataSinkType type
   2: optional TDataStreamSink stream_sink
   3: optional TTableSink table_sink
+  4: optional TJoinBuildSink join_build_sink
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 1842694..6363fcf 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -347,6 +347,11 @@ struct TQueryExecRequest {
   // it is unpartitioned.
   2: required list<Planner.TPlanFragment> fragments
 
+  // Multi-threaded execution: sequence of plans; the last one materializes
+  // the query result
+  // TODO: this will eventually supercede 'fragments'
+  14: optional list<Planner.TPlanFragmentTree> mt_plans
+
   // Specifies the destination fragment of the output of each fragment.
   // parent_fragment_idx.size() == fragments.size() - 1 and
   // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1fcce1b..6c2fc3e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -180,6 +180,12 @@ struct TQueryOptions {
   // is always, since fields IDs are NYI). Valid values are "position" (default) and
   // "name".
   43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
+
+  // Multi-threaded execution: number of cores per query per node.
+  // > 1: multi-threaded execution mode, with given number of cores
+  // 1: single-threaded execution mode
+  // 0: multi-threaded execution mode, number of cores is the pool default
+  44: optional i32 mt_num_cores = 1
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3f5273e..c9535eb 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -209,6 +209,9 @@ enum TImpalaQueryOptions {
   // Determines how to resolve Parquet files' schemas in the absence of field IDs (which
   // is always, since fields IDs are NYI). Valid values are "position" and "name".
   PARQUET_FALLBACK_SCHEMA_RESOLUTION
+
+  // Multi-threaded execution: number of cores per machine
+  MT_NUM_CORES
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index cf2cc04..43a08a9 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -76,3 +76,13 @@ struct TScanRangeLocations {
   // non-empty list
   2: list<TScanRangeLocation> locations
 }
+
+// A plan: tree of plan fragments that materializes either a query result or the build
+// side of a join used by another plan; it consists of a sequence of plan fragments.
+// TODO: rename both this and PlanNodes.TPlan (TPlan should be something like TExecPlan
+// or TExecTree)
+struct TPlanFragmentTree {
+  1: required i32 cohort_id
+
+  2: required list<TPlanFragment> fragments
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 8b2c5a2..85a14dc 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -20,6 +20,7 @@ typedef i32 TPlanNodeId
 typedef i32 TTupleId
 typedef i32 TSlotId
 typedef i32 TTableId
+typedef i32 TJoinTableId
 
 // TODO: Consider moving unrelated enums to better locations.
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
index 7950d4c..ca4097b 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
@@ -33,6 +33,7 @@ import com.cloudera.impala.thrift.TFunctionBinaryType;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 public class FunctionCallExpr extends Expr {
   private final FunctionName fnName_;
@@ -66,7 +67,7 @@ public class FunctionCallExpr extends Expr {
     fnName_ = fnName;
     params_ = params;
     isMergeAggFn_ = isMergeAggFn;
-    if (params.exprs() != null) children_ = params_.exprs();
+    if (params.exprs() != null) children_ = Lists.newArrayList(params_.exprs());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
index e042af1..63bcde5 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
@@ -110,10 +110,12 @@ public class TupleDescriptor {
     if (path_ == null) return null;
     return path_.getRootTable();
   }
+
   public TableName getTableName() {
     Table t = getTable();
     return (t == null) ? null : t.getTableName();
   }
+
   public void setPath(Path p) {
     Preconditions.checkNotNull(p);
     Preconditions.checkState(p.isResolved());
@@ -128,6 +130,7 @@ public class TupleDescriptor {
       type_ = Path.getTypeAsStruct(p.destType());
     }
   }
+
   public Path getPath() { return path_; }
   public void setType(StructType type) { type_ = type; }
   public StructType getType() { return type_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
index a7d46ce..37e7995 100644
--- a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
+++ b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
@@ -21,12 +21,11 @@ import java.util.List;
 import com.cloudera.impala.util.Visitor;
 import com.google.common.base.Predicate;
 
-public class TreeNode<NodeType extends TreeNode<NodeType>> {
-  protected List<NodeType> children_;
-
-  protected TreeNode() {
-    this.children_ = new ArrayList<NodeType>();
-  }
+/**
+ * Generic tree structure. Only concrete subclasses of this can be instantiated.
+ */
+public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
+  protected ArrayList<NodeType> children_ = new ArrayList<NodeType>();
 
   public NodeType getChild(int i) {
     return hasChild(i) ? children_.get(i) : null;
@@ -36,13 +35,17 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
     children_.add(n);
   }
 
+  public void removeChild(NodeType n) { children_.remove(n); }
+
+  public void clearChildren() { children_.clear(); }
+
   public void addChildren(List<? extends NodeType> l) {
     children_.addAll(l);
   }
 
   public boolean hasChild(int i) { return children_.size() > i; }
   public void setChild(int index, NodeType n) { children_.set(index, n); }
-  public List<NodeType> getChildren() { return children_; }
+  public ArrayList<NodeType> getChildren() { return children_; }
 
   /**
    * Count the total number of nodes in this tree. Leaf node will return 1.
@@ -50,9 +53,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public int numNodes() {
     int numNodes = 1;
-    for (NodeType child: children_) {
-      numNodes += child.numNodes();
-    }
+    for (NodeType child: children_) numNodes += child.numNodes();
     return numNodes;
   }
 
@@ -72,10 +73,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
       matches.add((D) this);
       return;
     }
-
-    for (NodeType child: children_) {
-      child.collect(predicate, matches);
-    }
+    for (NodeType child: children_) child.collect(predicate, matches);
   }
 
   /**
@@ -89,10 +87,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
       matches.add((D) this);
       return;
     }
-
-    for (NodeType child: children_) {
-      child.collect(cl, matches);
-    }
+    for (NodeType child: children_) child.collect(cl, matches);
   }
 
   /**
@@ -102,13 +97,8 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public <C extends TreeNode<NodeType>, D extends C> void collectAll(
       Predicate<? super C> predicate, List<D> matches) {
-    if (predicate.apply((C) this)) {
-      matches.add((D) this);
-    }
-
-    for (NodeType child: children_) {
-      child.collectAll(predicate, matches);
-    }
+    if (predicate.apply((C) this)) matches.add((D) this);
+    for (NodeType child: children_) child.collectAll(predicate, matches);
   }
 
   /**
@@ -117,9 +107,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public static <C extends TreeNode<C>, D extends C> void collect(
       Collection<C> nodeList, Predicate<? super C> predicate, Collection<D> matches) {
-    for (C node: nodeList) {
-      node.collect(predicate, matches);
-    }
+    for (C node: nodeList) node.collect(predicate, matches);
   }
 
   /**
@@ -128,9 +116,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public static <C extends TreeNode<C>, D extends C> void collect(
       Collection<C> nodeList, Class cl, Collection<D> matches) {
-    for (C node: nodeList) {
-      node.collect(cl, matches);
-    }
+    for (C node: nodeList) node.collect(cl, matches);
   }
 
   /**
@@ -139,9 +125,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
   public <C extends TreeNode<NodeType>> boolean contains(
       Predicate<? super C> predicate) {
     if (predicate.apply((C) this)) return true;
-    for (NodeType child: children_) {
-      if (child.contains(predicate)) return true;
-    }
+    for (NodeType child: children_) if (child.contains(predicate)) return true;
     return false;
   }
 
@@ -150,9 +134,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public boolean contains(Class cl) {
     if (cl.equals(getClass())) return true;
-    for (NodeType child: children_) {
-      if (child.contains(cl)) return true;
-    }
+    for (NodeType child: children_) if (child.contains(cl)) return true;
     return false;
   }
 
@@ -162,9 +144,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public static <C extends TreeNode<C>, D extends C> boolean contains(
       Collection<C> nodeList, Predicate<? super C> predicate) {
-    for (C node: nodeList) {
-      if (node.contains(predicate)) return true;
-    }
+    for (C node: nodeList) if (node.contains(predicate)) return true;
     return false;
   }
 
@@ -173,9 +153,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public static <C extends TreeNode<C>> boolean contains(
       List<C> nodeList, Class cl) {
-    for (C node: nodeList) {
-      if (node.contains(cl)) return true;
-    }
+    for (C node: nodeList) if (node.contains(cl)) return true;
     return false;
   }
 
@@ -196,8 +174,6 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
    */
   public <C extends TreeNode<NodeType>> void accept(Visitor<C> visitor) {
     visitor.visit((C) this);
-    for (NodeType p: children_) {
-      p.accept(visitor);
-    }
+    for (NodeType p: children_) p.accept(visitor);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
new file mode 100644
index 0000000..2b5b2e4
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class CohortId extends Id<CohortId> {
+  // Construction only allowed via an IdGenerator.
+  protected CohortId(int id) {
+    super(id);
+  }
+
+  public static IdGenerator<CohortId> createGenerator() {
+    return new IdGenerator<CohortId>() {
+      @Override
+      public CohortId getNextId() { return new CohortId(nextId_++); }
+      @Override
+      public CohortId getMaxId() { return new CohortId(nextId_ - 1); }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%02d", id_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
index 084dd96..a298781 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
@@ -29,7 +29,6 @@ import com.cloudera.impala.thrift.TExplainLevel;
  * The destination could be another plan fragment on a remote machine,
  * or a table into which the rows are to be inserted
  * (i.e., the destination of the last fragment of an INSERT statement).
- *
  */
 public abstract class DataSink {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
index d1a3069..bdf32e9 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
@@ -230,7 +230,7 @@ public class DistributedPlanner {
 
     Preconditions.checkState(partitionHint == null || partitionHint);
     ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchNode.addChild(inputFragment.getPlanRoot(), false);
+    exchNode.addChild(inputFragment.getPlanRoot());
     exchNode.init(analyzer);
     Preconditions.checkState(exchNode.hasValidStats());
     DataPartition partition = DataPartition.hashPartitioned(nonConstPartitionExprs);
@@ -251,7 +251,7 @@ public class DistributedPlanner {
       throws ImpalaException {
     Preconditions.checkState(inputFragment.isPartitioned());
     ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId());
-    mergePlan.addChild(inputFragment.getPlanRoot(), false);
+    mergePlan.addChild(inputFragment.getPlanRoot());
     mergePlan.init(ctx_.getRootAnalyzer());
     Preconditions.checkState(mergePlan.hasValidStats());
     PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
@@ -292,7 +292,7 @@ public class DistributedPlanner {
       throws ImpalaException {
     node.setDistributionMode(DistributionMode.BROADCAST);
     node.setChild(0, leftChildFragment.getPlanRoot());
-    connectChildFragment(node, 1, rightChildFragment);
+    connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
     leftChildFragment.setPlanRoot(node);
     return leftChildFragment;
   }
@@ -323,11 +323,11 @@ public class DistributedPlanner {
             lhsJoinExprs, rhsJoinExprs, analyzer)) {
       node.setChild(0, leftChildFragment.getPlanRoot());
       node.setChild(1, rightChildFragment.getPlanRoot());
-      // Redirect fragments sending to rightFragment to leftFragment.
-      for (PlanFragment fragment: fragments) {
-        if (fragment.getDestFragment() == rightChildFragment) {
-          fragment.setDestination(fragment.getDestNode());
-        }
+      // fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child
+      leftChildFragment.setFragmentInPlanTree(node.getChild(1));
+      // Relocate input fragments of rightChildFragment to leftChildFragment.
+      for (PlanFragment rhsInput: rightChildFragment.getChildren()) {
+        leftChildFragment.getChildren().add(rhsInput);
       }
       // Remove right fragment because its plan tree has been merged into leftFragment.
       fragments.remove(rightChildFragment);
@@ -346,7 +346,7 @@ public class DistributedPlanner {
           leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
       if (rhsJoinPartition != null) {
         node.setChild(0, leftChildFragment.getPlanRoot());
-        connectChildFragment(node, 1, rightChildFragment);
+        connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
         rightChildFragment.setOutputPartition(rhsJoinPartition);
         leftChildFragment.setPlanRoot(node);
         return leftChildFragment;
@@ -360,7 +360,7 @@ public class DistributedPlanner {
           rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
       if (lhsJoinPartition != null) {
         node.setChild(1, rightChildFragment.getPlanRoot());
-        connectChildFragment(node, 0, leftChildFragment);
+        connectChildFragment(node, 0, rightChildFragment, leftChildFragment);
         leftChildFragment.setOutputPartition(lhsJoinPartition);
         rightChildFragment.setPlanRoot(node);
         return rightChildFragment;
@@ -379,11 +379,11 @@ public class DistributedPlanner {
     // on their respective join exprs.
     // The new fragment is hash-partitioned on the lhs input join exprs.
     ExchangeNode lhsExchange = new ExchangeNode(ctx_.getNextNodeId());
-    lhsExchange.addChild(leftChildFragment.getPlanRoot(), false);
+    lhsExchange.addChild(leftChildFragment.getPlanRoot());
     lhsExchange.computeStats(null);
     node.setChild(0, lhsExchange);
     ExchangeNode rhsExchange = new ExchangeNode(ctx_.getNextNodeId());
-    rhsExchange.addChild(rightChildFragment.getPlanRoot(), false);
+    rhsExchange.addChild(rightChildFragment.getPlanRoot());
     rhsExchange.computeStats(null);
     node.setChild(1, rhsExchange);
 
@@ -502,7 +502,7 @@ public class DistributedPlanner {
       // the join; the build input is provided by an ExchangeNode, which is the
       // destination of the rightChildFragment's output
       node.setChild(0, leftChildFragment.getPlanRoot());
-      connectChildFragment(node, 1, rightChildFragment);
+      connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
       leftChildFragment.setPlanRoot(node);
       hjFragment = leftChildFragment;
     } else {
@@ -624,15 +624,22 @@ public class DistributedPlanner {
       if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
     }
 
+    // remove all children to avoid them being tagged with the wrong
+    // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet)
+    unionNode.clearChildren();
+
     // If all child fragments are unpartitioned, return a single unpartitioned fragment
     // with a UnionNode that merges all child fragments.
     if (numUnpartitionedChildFragments == childFragments.size()) {
-      // Absorb the plan trees of all childFragments into unionNode.
-      for (int i = 0; i < childFragments.size(); ++i) {
-        unionNode.setChild(i, childFragments.get(i).getPlanRoot());
-      }
       PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
           unionNode, DataPartition.UNPARTITIONED);
+      // Absorb the plan trees of all childFragments into unionNode
+      // and fix up the fragment tree in the process.
+      for (int i = 0; i < childFragments.size(); ++i) {
+        unionNode.addChild(childFragments.get(i).getPlanRoot());
+        unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
+        unionFragment.addChildren(childFragments.get(i).getChildren());
+      }
       unionNode.init(ctx_.getRootAnalyzer());
       // All child fragments have been absorbed into unionFragment.
       fragments.removeAll(childFragments);
@@ -640,22 +647,24 @@ public class DistributedPlanner {
     }
 
     // There is at least one partitioned child fragment.
+    PlanFragment unionFragment = new PlanFragment(
+        ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM);
     for (int i = 0; i < childFragments.size(); ++i) {
       PlanFragment childFragment = childFragments.get(i);
       if (childFragment.isPartitioned()) {
-        // Absorb the plan trees of all partitioned child fragments into unionNode.
-        unionNode.setChild(i, childFragment.getPlanRoot());
+        // absorb the plan trees of all partitioned child fragments into unionNode
+        unionNode.addChild(childFragment.getPlanRoot());
+        unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
+        unionFragment.addChildren(childFragment.getChildren());
         fragments.remove(childFragment);
       } else {
+        // dummy entry for subsequent addition of the ExchangeNode
+        unionNode.addChild(null);
         // Connect the unpartitioned child fragments to unionNode via a random exchange.
-        connectChildFragment(unionNode, i, childFragment);
+        connectChildFragment(unionNode, i, unionFragment, childFragment);
         childFragment.setOutputPartition(DataPartition.RANDOM);
       }
     }
-
-    // Fragment contains the UnionNode that consumes the data of all child fragments.
-    PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
-        unionNode, DataPartition.RANDOM);
     unionNode.reorderOperands(ctx_.getRootAnalyzer());
     unionNode.init(ctx_.getRootAnalyzer());
     return unionFragment;
@@ -678,13 +687,14 @@ public class DistributedPlanner {
 
   /**
    * Replace node's child at index childIdx with an ExchangeNode that receives its
-   * input from childFragment.
+   * input from childFragment. ParentFragment contains node and the new ExchangeNode.
    */
   private void connectChildFragment(PlanNode node, int childIdx,
-      PlanFragment childFragment) throws ImpalaException {
+      PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException {
     ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchangeNode.addChild(childFragment.getPlanRoot(), false);
+    exchangeNode.addChild(childFragment.getPlanRoot());
     exchangeNode.init(ctx_.getRootAnalyzer());
+    exchangeNode.setFragment(parentFragment);
     node.setChild(childIdx, exchangeNode);
     childFragment.setDestination(exchangeNode);
   }
@@ -703,7 +713,7 @@ public class DistributedPlanner {
       PlanFragment childFragment, DataPartition parentPartition)
       throws ImpalaException {
     ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
-    exchangeNode.addChild(childFragment.getPlanRoot(), false);
+    exchangeNode.addChild(childFragment.getPlanRoot());
     exchangeNode.init(ctx_.getRootAnalyzer());
     PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
         exchangeNode, parentPartition);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
index 22601b5..590f718 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
@@ -71,7 +71,7 @@ public class ExchangeNode extends PlanNode {
     Preconditions.checkState(conjuncts_.isEmpty());
   }
 
-  public void addChild(PlanNode node, boolean copyConjuncts) {
+  public void addChild(PlanNode node) {
     // This ExchangeNode 'inherits' several parameters from its children.
     // Ensure that all children agree on them.
     if (!children_.isEmpty()) {
@@ -84,7 +84,6 @@ public class ExchangeNode extends PlanNode {
       tupleIds_ = Lists.newArrayList(node.tupleIds_);
       nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_);
     }
-    if (copyConjuncts) conjuncts_.addAll(Expr.cloneList(node.conjuncts_));
     children_.add(node);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
index 10e0460..e641119 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
@@ -143,6 +143,12 @@ public class HashJoinNode extends JoinNode {
     output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
         getDisplayLabelDetail()));
 
+    if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) {
+      if (joinTableId_.isValid()) {
+        output.append(
+            detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n");
+      }
+    }
     if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
       output.append(detailPrefix + "hash predicates: ");
       for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
new file mode 100644
index 0000000..2971bf8
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
@@ -0,0 +1,100 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.analysis.Analyzer;
+import com.cloudera.impala.analysis.BinaryPredicate;
+import com.cloudera.impala.analysis.Expr;
+import com.cloudera.impala.common.ImpalaException;
+import com.cloudera.impala.thrift.TDataSink;
+import com.cloudera.impala.thrift.TDataSinkType;
+import com.cloudera.impala.thrift.TExplainLevel;
+import com.cloudera.impala.thrift.TJoinBuildSink;
+import com.cloudera.impala.thrift.TPlanNode;
+import com.cloudera.impala.thrift.TPlanNodeType;
+import com.cloudera.impala.thrift.TQueryOptions;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Sink to materialize the build side of a join.
+ */
+public class JoinBuildSink extends DataSink {
+  private final static Logger LOG = LoggerFactory.getLogger(JoinBuildSink.class);
+
+  // id of join's build-side table assigned during planning
+  private final JoinTableId joinTableId_;
+
+  private final List<Expr> buildExprs_ = Lists.newArrayList();
+
+  /**
+   * Creates sink for build side of 'joinNode' (extracts buildExprs_ from joinNode).
+   */
+  public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
+    Preconditions.checkState(joinTableId.isValid());
+    joinTableId_ = joinTableId;
+    Preconditions.checkNotNull(joinNode);
+    Preconditions.checkState(joinNode instanceof JoinNode);
+    if (!(joinNode instanceof HashJoinNode)) return;
+    for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) {
+      BinaryPredicate p = (BinaryPredicate) eqJoinConjunct;
+      // by convention the build exprs are the rhs of the join conjuncts
+      buildExprs_.add(p.getChild(1).clone());
+    }
+  }
+
+  public JoinTableId getJoinTableId() { return joinTableId_; }
+
+  @Override
+  protected TDataSink toThrift() {
+    TDataSink result = new TDataSink(TDataSinkType.JOIN_BUILD_SINK);
+    TJoinBuildSink tBuildSink = new TJoinBuildSink();
+    tBuildSink.setJoin_table_id(joinTableId_.asInt());
+    for (Expr buildExpr: buildExprs_) {
+      tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
+    }
+    result.setJoin_build_sink(tBuildSink);
+    return result;
+  }
+
+  @Override
+  public String getExplainString(String prefix, String detailPrefix,
+      TExplainLevel detailLevel) {
+    StringBuilder output = new StringBuilder();
+    output.append(String.format("%s%s\n", prefix, "JOIN BUILD"));
+    if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+      output.append(
+          detailPrefix + "join-table-id=" + joinTableId_.toString()
+            + " plan-id=" + fragment_.getPlanId().toString()
+            + " cohort-id=" + fragment_.getCohortId().toString() + "\n");
+      if (!buildExprs_.isEmpty()) {
+        output.append(detailPrefix + "build expressions: ")
+            .append(Expr.toSql(buildExprs_) + "\n");
+      }
+    }
+    return output.toString();
+  }
+
+  @Override
+  public void computeCosts() {
+    // TODO: implement?
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
index 19e4724..34de765 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
@@ -59,6 +59,10 @@ public abstract class JoinNode extends PlanNode {
   protected List<BinaryPredicate> eqJoinConjuncts_;
   protected List<Expr> otherJoinConjuncts_;
 
+  // if valid, the rhs input is materialized outside of this node and is assigned
+  // joinTableId_
+  protected JoinTableId joinTableId_ = JoinTableId.INVALID;
+
   public enum DistributionMode {
     NONE("NONE"),
     BROADCAST("BROADCAST"),
@@ -129,6 +133,8 @@ public abstract class JoinNode extends PlanNode {
   public DistributionMode getDistributionModeHint() { return distrModeHint_; }
   public DistributionMode getDistributionMode() { return distrMode_; }
   public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; }
+  public JoinTableId getJoinTableId() { return joinTableId_; }
+  public void setJoinTableId(JoinTableId id) { joinTableId_ = id; }
 
   @Override
   public void init(Analyzer analyzer) throws ImpalaException {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
new file mode 100644
index 0000000..4e0f542
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
@@ -0,0 +1,44 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class JoinTableId extends Id<JoinTableId> {
+  // Construction only allowed via an IdGenerator.
+  protected JoinTableId(int id) {
+    super(id);
+  }
+
+  public static JoinTableId INVALID;
+  static {
+    INVALID = new JoinTableId(Id.INVALID_ID);
+  }
+
+  public static IdGenerator<JoinTableId> createGenerator() {
+    return new IdGenerator<JoinTableId>() {
+      @Override
+      public JoinTableId getNextId() { return new JoinTableId(nextId_++); }
+      @Override
+      public JoinTableId getMaxId() { return new JoinTableId(nextId_ - 1); }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%02d", id_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
index 474a60c..fd9ef2f 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
@@ -93,6 +93,10 @@ public class NestedLoopJoinNode extends JoinNode {
           displayName_, getDisplayLabelDetail()));
     }
     if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
+      if (joinTableId_.isValid()) {
+          output.append(
+              detailPrefix + "join table id: " + joinTableId_.toString() + "\n");
+      }
       if (!otherJoinConjuncts_.isEmpty()) {
         output.append(detailPrefix + "join predicates: ")
         .append(getExplainString(otherJoinConjuncts_) + "\n");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
new file mode 100644
index 0000000..f66ce15
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
@@ -0,0 +1,202 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.common.IdGenerator;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * The parallel planner is responsible for breaking up a single distributed plan
+ * (= tree of PlanFragments) into a (logical) tree of distributed plans. The root
+ * of that tree produces the query result, all the other ones produce intermediate
+ * join build sides. All plans that produce intermediate join build sides (one per join
+ * node in the recipient) for a single recipient plan are grouped together into a
+ * cohort. Since each plan may only produce a build side for at most one recipient
+ * plan, each plan belongs to exactly one cohort.
+ *
+ * TODO: if the input to the JoinBuildSink is the result of a grouping aggregation
+ * on the join keys, the AggregationNode should materialize the final hash table
+ * directly (instead of reading the hash table content and feeding it into a
+ * JoinBuildSink to build another hash table)
+ *
+ * TODO: instead of cohort ids, create a Plan class that is a subclass of TreeNode?
+ */
+public class ParallelPlanner {
+  private final static Logger LOG = LoggerFactory.getLogger(ParallelPlanner.class);
+
+  private final IdGenerator<JoinTableId> joinTableIdGenerator_ =
+      JoinTableId.createGenerator();
+  private final IdGenerator<PlanId> planIdGenerator_ = PlanId.createGenerator();
+  private final IdGenerator<CohortId> cohortIdGenerator_ = CohortId.createGenerator();
+  private final PlannerContext ctx_;
+
+  private List<PlanFragment> planRoots_ = Lists.newArrayList();
+
+  public ParallelPlanner(PlannerContext ctx) { ctx_ = ctx; }
+
+  /**
+   * Given a distributed plan, return list of plans ready for parallel execution.
+   * The last plan in the sequence materializes the query result, the preceding
+   * plans materialize the build sides of joins.
+   * Assigns cohortId and planId for all fragments.
+   * TODO: create class DistributedPlan with a PlanFragment member, so we don't
+   * need to distinguish PlanFragment and Plan through comments?
+   */
+  public List<PlanFragment> createPlans(PlanFragment root) {
+    root.setPlanId(planIdGenerator_.getNextId());
+    root.setCohortId(cohortIdGenerator_.getNextId());
+    planRoots_.add(root);
+    createBuildPlans(root, null);
+    return planRoots_;
+  }
+
+  /**
+   * Recursively traverse tree of fragments of 'plan' from top to bottom and
+   * move all build inputs of joins into separate plans. 'buildCohortId' is the
+   * cohort id of the build plans of 'fragment' and may be null if the plan
+   * to which 'fragment' belongs has so far not required any build plans.
+   * Assign fragment's plan id and cohort id to children.
+   */
+  private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) {
+    LOG.info("createbuildplans fragment " + fragment.getId().toString());
+    List<JoinNode> joins = Lists.newArrayList();
+    collectJoins(fragment.getPlanRoot(), joins);
+    if (!joins.isEmpty()) {
+      List<String> joinIds = Lists.newArrayList();
+      for (JoinNode join: joins) joinIds.add(join.getId().toString());
+      LOG.info("collected joins " + Joiner.on(" ").join(joinIds));
+
+      if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
+      for (JoinNode join: joins) createBuildPlan(join, buildCohortId);
+    }
+
+    if (!fragment.getChildren().isEmpty()) {
+      List<String> ids = Lists.newArrayList();
+      for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString());
+      LOG.info("collected children " + Joiner.on(" ").join(ids) + " parent "
+          + fragment.getId().toString());
+    }
+    for (PlanFragment child: fragment.getChildren()) {
+      child.setPlanId(fragment.getPlanId());
+      child.setCohortId(fragment.getCohortId());
+      createBuildPlans(child, buildCohortId);
+    }
+  }
+
+  /**
+   * Collect all JoinNodes that aren't themselves the build side of a join node
+   * in this fragment or the rhs of a SubplanNode.
+   */
+  private void collectJoins(PlanNode node, List<JoinNode> result) {
+    if (node instanceof JoinNode) {
+      result.add((JoinNode)node);
+      // for joins, only descend through the probe side;
+      // we're recursively traversing the build side when constructing the build plan
+      // in createBuildPlan()
+      collectJoins(node.getChild(0), result);
+      return;
+    }
+    if (node instanceof ExchangeNode) return;
+    if (node instanceof SubplanNode) {
+      collectJoins(node.getChild(0), result);
+      return;
+    }
+    for (PlanNode child: node.getChildren()) collectJoins(child, result);
+  }
+
+  /**
+   * Collect all ExchangeNodes in this fragment.
+   */
+  private void collectExchangeNodes(PlanNode node, List<ExchangeNode> result) {
+    if (node instanceof ExchangeNode) {
+      result.add((ExchangeNode)node);
+      return;
+    }
+    for (PlanNode child: node.getChildren()) collectExchangeNodes(child, result);
+  }
+
+  /**
+   * Create new plan that materializes build input of 'join' and assign it 'cohortId'.
+   * In the process, moves all fragments required for this materialization from tree
+   * rooted at 'join's fragment into the new plan.
+   * Also assigns the new plan a plan id.
+   */
+  private void createBuildPlan(JoinNode join, CohortId cohortId) {
+    LOG.info("createbuildplan " + join.getId().toString());
+    Preconditions.checkNotNull(cohortId);
+    // collect all ExchangeNodes on the build side and their corresponding input
+    // fragments
+    final List<ExchangeNode> exchNodes = Lists.newArrayList();
+    collectExchangeNodes(join.getChild(1), exchNodes);
+
+    com.google.common.base.Predicate<PlanFragment> isInputFragment =
+        new com.google.common.base.Predicate<PlanFragment>() {
+          public boolean apply(PlanFragment f) {
+            // we're starting with the fragment containing the join, which might
+            // be terminal
+            if (f.getDestNode() == null) return false;
+            for (ExchangeNode exch: exchNodes) {
+              if (exch.getId() == f.getDestNode().getId()) return true;
+            }
+            return false;
+          }
+        };
+    List<PlanFragment> inputFragments = Lists.newArrayList();
+    join.getFragment().collect(isInputFragment, inputFragments);
+    Preconditions.checkState(exchNodes.size() == inputFragments.size());
+
+    // Create new fragment with JoinBuildSink that consumes the output of the
+    // join's rhs input (the one that materializes the build side).
+    // The new fragment has the same data partition as the join node's fragment.
+    JoinBuildSink buildSink =
+        new JoinBuildSink(joinTableIdGenerator_.getNextId(), join);
+    join.setJoinTableId(buildSink.getJoinTableId());
+    // c'tor fixes up PlanNode.fragment_
+    PlanFragment buildFragment = new PlanFragment(ctx_.getNextFragmentId(),
+        join.getChild(1), join.getFragment().getDataPartition());
+    buildFragment.setSink(buildSink);
+
+    // move input fragments
+    for (int i = 0; i < exchNodes.size(); ++i) {
+      LOG.info("re-link fragment " + inputFragments.get(i).getId().toString() + " to "
+          + exchNodes.get(i).getFragment().getId().toString());
+      Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment);
+      join.getFragment().removeChild(inputFragments.get(i));
+      buildFragment.getChildren().add(inputFragments.get(i));
+    }
+
+    // assign plan and cohort id
+    buildFragment.setPlanId(planIdGenerator_.getNextId());
+    PlanId parentPlanId = join.getFragment().getPlanId();
+    buildFragment.setCohortId(cohortId);
+
+    planRoots_.add(buildFragment);
+    LOG.info("new build fragment " + buildFragment.getId().toString());
+    LOG.info("in cohort " + buildFragment.getCohortId().toString());
+    LOG.info("for join node " + join.getId().toString());
+    createBuildPlans(buildFragment, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
index f702271..85a4d40 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
@@ -29,26 +29,36 @@ import com.cloudera.impala.catalog.HdfsTable;
 import com.cloudera.impala.common.AnalysisException;
 import com.cloudera.impala.common.InternalException;
 import com.cloudera.impala.common.NotImplementedException;
+import com.cloudera.impala.common.TreeNode;
 import com.cloudera.impala.planner.JoinNode.DistributionMode;
 import com.cloudera.impala.thrift.TExplainLevel;
 import com.cloudera.impala.thrift.TPartitionType;
+import com.cloudera.impala.thrift.TPlan;
 import com.cloudera.impala.thrift.TPlanFragment;
+import com.cloudera.impala.thrift.TPlanFragmentTree;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 
 /**
- * A PlanFragment is part of a tree of such fragments that together make
- * up a complete execution plan for a single query. Each plan fragment can have
- * one or many instances, each of which in turn is executed by a single node and the
- * output sent to a specific instance of the destination fragment (or, in the case
- * of the root fragment, is materialized in some form).
+ * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments
+ * connected in that way forms a plan. The output of a plan is produced by the root
+ * fragment and is either the result of the query or an intermediate result
+ * needed by a different plan (such as a hash table).
  *
- * The plan fragment encapsulates the specific tree of execution nodes that
+ * Plans are grouped into cohorts based on the consumer of their output: all
+ * plans that materialize intermediate results for a particular consumer plan
+ * are grouped into a single cohort.
+ *
+ * A PlanFragment encapsulates the specific tree of execution nodes that
  * are used to produce the output of the plan fragment, as well as output exprs,
  * destination node, etc. If there are no output exprs, the full row that is
  * is produced by the plan root is marked as materialized.
  *
+ * A plan fragment can have one or many instances, each of which in turn is executed by
+ * an individual node and the output sent to a specific instance of the destination
+ * fragment (or, in the case of the root fragment, is materialized in some form).
+ *
  * A hash-partitioned plan fragment is the result of one or more hash-partitioning data
  * streams being received by plan nodes in this fragment. In the future, a fragment's
  * data partition could also be hash partitioned based on a scan node that is reading
@@ -59,11 +69,17 @@ import com.google.common.collect.Lists;
  * - assemble with getters, etc.
  * - finalize()
  * - toThrift()
+ *
+ * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes
+ *   it impossible search for things within a fragment (using TreeNode functions);
+ *   fix that
  */
-public class PlanFragment {
+public class PlanFragment extends TreeNode<PlanFragment> {
   private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class);
 
   private final PlanFragmentId fragmentId_;
+  private PlanId planId_;
+  private CohortId cohortId_;
 
   // root of plan tree executed by this fragment
   private PlanNode planRoot_;
@@ -103,14 +119,26 @@ public class PlanFragment {
    * Does not traverse the children of ExchangeNodes because those must belong to a
    * different fragment.
    */
-  private void setFragmentInPlanTree(PlanNode node) {
+  public void setFragmentInPlanTree(PlanNode node) {
     if (node == null) return;
     node.setFragment(this);
-    if (!(node instanceof ExchangeNode)) {
-      for (PlanNode child : node.getChildren()) {
-        setFragmentInPlanTree(child);
-      }
-    }
+    if (node instanceof ExchangeNode) return;
+    for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child);
+  }
+
+  /**
+   * Collect all PlanNodes that belong to the exec tree of this fragment.
+   */
+  public void collectPlanNodes(List<PlanNode> nodes) {
+    Preconditions.checkNotNull(nodes);
+    collectPlanNodesHelper(planRoot_, nodes);
+  }
+
+  private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) {
+    if (root == null) return;
+    nodes.add(root);
+    if (root instanceof ExchangeNode) return;
+    for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes);
   }
 
   public void setOutputExprs(List<Expr> outputExprs) {
@@ -216,31 +244,56 @@ public class PlanFragment {
     return result;
   }
 
-  public String getExplainString(TExplainLevel explainLevel) {
+  public TPlanFragmentTree treeToThrift() {
+    TPlanFragmentTree result = new TPlanFragmentTree();
+    treeToThriftHelper(result);
+    return result;
+  }
+
+  private void treeToThriftHelper(TPlanFragmentTree plan) {
+    plan.addToFragments(toThrift());
+    for (PlanFragment child: children_) {
+      child.treeToThriftHelper(plan);
+    }
+  }
+
+  public String getExplainString(TExplainLevel detailLevel) {
+    return getExplainString("", "", detailLevel);
+  }
+
+  /**
+   * The root of the output tree will be prefixed by rootPrefix and the remaining plan
+   * output will be prefixed by prefix.
+   */
+  protected final String getExplainString(String rootPrefix, String prefix,
+      TExplainLevel detailLevel) {
     StringBuilder str = new StringBuilder();
     Preconditions.checkState(dataPartition_ != null);
-    String rootPrefix = "";
-    String prefix = "";
-    String detailPrefix = "|  ";
-    if (explainLevel == TExplainLevel.VERBOSE) {
+    String detailPrefix = prefix + "|  ";  // sink detail
+    if (detailLevel == TExplainLevel.VERBOSE) {
+      // we're printing a new tree, start over with the indentation
       prefix = "  ";
       rootPrefix = "  ";
       detailPrefix = prefix + "|  ";
       str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(),
           dataPartition_.getExplainString()));
       if (sink_ != null && sink_ instanceof DataStreamSink) {
-        str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel) + "\n");
+        str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + "\n");
       }
     }
-    // Always print table sinks.
-    if (sink_ != null && sink_ instanceof TableSink) {
-      str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel));
-      if (explainLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
+
+    String planRootPrefix = rootPrefix;
+    // Always print sinks other than DataStreamSinks.
+    if (sink_ != null && !(sink_ instanceof DataStreamSink)) {
+      str.append(sink_.getExplainString(rootPrefix, detailPrefix, detailLevel));
+      if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
         str.append(prefix + "|\n");
       }
+      // we already used the root prefix for the sink
+      planRootPrefix = prefix;
     }
     if (planRoot_ != null) {
-      str.append(planRoot_.getExplainString(rootPrefix, prefix, explainLevel));
+      str.append(planRoot_.getExplainString(planRootPrefix, prefix, detailLevel));
     }
     return str.toString();
   }
@@ -251,6 +304,10 @@ public class PlanFragment {
   }
 
   public PlanFragmentId getId() { return fragmentId_; }
+  public PlanId getPlanId() { return planId_; }
+  public void setPlanId(PlanId id) { planId_ = id; }
+  public CohortId getCohortId() { return cohortId_; }
+  public void setCohortId(CohortId id) { cohortId_ = id; }
   public PlanFragment getDestFragment() {
     if (destNode_ == null) return null;
     return destNode_.getFragment();
@@ -270,7 +327,13 @@ public class PlanFragment {
     setFragmentInPlanTree(planRoot_);
   }
 
-  public void setDestination(ExchangeNode destNode) { destNode_ = destNode; }
+  public void setDestination(ExchangeNode destNode) {
+    destNode_ = destNode;
+    PlanFragment dest = getDestFragment();
+    Preconditions.checkNotNull(dest);
+    dest.addChild(this);
+  }
+
   public boolean hasSink() { return sink_ != null; }
   public DataSink getSink() { return sink_; }
   public void setSink(DataSink sink) {
@@ -290,4 +353,33 @@ public class PlanFragment {
     planRoot_ = newRoot;
     planRoot_.setFragment(this);
   }
+
+  /**
+   * Verify that the tree of PlanFragments and their contained tree of
+   * PlanNodes is constructed correctly.
+   */
+  public void verifyTree() {
+    // PlanNode.fragment_ is set correctly
+    List<PlanNode> nodes = Lists.newArrayList();
+    collectPlanNodes(nodes);
+    List<PlanNode> exchNodes = Lists.newArrayList();
+    for (PlanNode node: nodes) {
+      if (node instanceof ExchangeNode) exchNodes.add(node);
+      Preconditions.checkState(node.getFragment() == this);
+    }
+
+    // all ExchangeNodes have registered input fragments
+    Preconditions.checkState(exchNodes.size() == getChildren().size());
+    List<PlanFragment> childFragments = Lists.newArrayList();
+    for (PlanNode exchNode: exchNodes) {
+      PlanFragment childFragment = exchNode.getChild(0).getFragment();
+      Preconditions.checkState(!childFragments.contains(childFragment));
+      childFragments.add(childFragment);
+      Preconditions.checkState(childFragment.getDestNode() == exchNode);
+    }
+    // all registered children are accounted for
+    Preconditions.checkState(getChildren().containsAll(childFragments));
+
+    for (PlanFragment child: getChildren()) child.verifyTree();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
new file mode 100644
index 0000000..1c7213a
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class PlanId extends Id<PlanId> {
+  // Construction only allowed via an IdGenerator.
+  protected PlanId(int id) {
+    super(id);
+  }
+
+  public static IdGenerator<PlanId> createGenerator() {
+    return new IdGenerator<PlanId>() {
+      @Override
+      public PlanId getNextId() { return new PlanId(nextId_++); }
+      @Override
+      public PlanId getMaxId() { return new PlanId(nextId_ - 1); }
+    };
+  }
+
+  @Override
+  public String toString() {
+    return String.format("%02d", id_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
index c393b4f..895df78 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
@@ -314,9 +314,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       String childHeadlinePrefix = prefix + "|--";
       String childDetailPrefix = prefix + "|  ";
       for (int i = children_.size() - 1; i >= 1; --i) {
-        expBuilder.append(
-            children_.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
-                detailLevel));
+        PlanNode child = getChild(i);
+        if (fragment_ != child.fragment_) {
+          // we're crossing a fragment boundary
+          expBuilder.append(
+              child.fragment_.getExplainString(
+                childHeadlinePrefix, childDetailPrefix, detailLevel));
+        } else {
+          expBuilder.append(
+              child.getExplainString(childHeadlinePrefix, childDetailPrefix,
+                  detailLevel));
+        }
         if (printFiller) expBuilder.append(filler + "\n");
       }
       expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel));
@@ -327,7 +335,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   /**
    * Return the node-specific details.
    * Subclass should override this function.
-   * Each line should be prefix by detailPrefix.
+   * Each line should be prefixed by detailPrefix.
    */
   protected String getNodeExplainString(String rootPrefix, String detailPrefix,
       TExplainLevel detailLevel) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
index cc6f923..f639c59 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
@@ -96,6 +96,7 @@ public class Planner {
     }
 
     PlanFragment rootFragment = fragments.get(fragments.size() - 1);
+    rootFragment.verifyTree();
     ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
     List<Expr> resultExprs = null;
     if (ctx_.isInsertOrCtas()) {
@@ -161,6 +162,19 @@ public class Planner {
   }
 
   /**
+   * Return a list of plans, each represented by the root of their fragment trees.
+   * TODO: roll into createPlan()
+   */
+  public List<PlanFragment> createParallelPlans() throws ImpalaException {
+    ArrayList<PlanFragment> distrPlan = createPlan();
+    Preconditions.checkNotNull(distrPlan);
+    ParallelPlanner planner = new ParallelPlanner(ctx_);
+    List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
+    ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created");
+    return parallelPlans;
+  }
+
+  /**
    * Return combined explain string for all plan fragments.
    * Includes the estimated resource requirements from the request if set.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
index 4d21895..fc1f2fc 100644
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
@@ -118,6 +118,7 @@ import com.cloudera.impala.thrift.TLoadDataReq;
 import com.cloudera.impala.thrift.TLoadDataResp;
 import com.cloudera.impala.thrift.TMetadataOpRequest;
 import com.cloudera.impala.thrift.TPlanFragment;
+import com.cloudera.impala.thrift.TPlanFragmentTree;
 import com.cloudera.impala.thrift.TQueryCtx;
 import com.cloudera.impala.thrift.TQueryExecRequest;
 import com.cloudera.impala.thrift.TResetMetadataRequest;
@@ -947,6 +948,23 @@ public class Frontend {
     // create plan
     LOG.debug("create plan");
     Planner planner = new Planner(analysisResult, queryCtx);
+    if (RuntimeEnv.INSTANCE.isTestEnv()
+        && queryCtx.request.query_options.mt_num_cores != 1) {
+      // TODO: this is just to be able to run tests; implement this
+      List<PlanFragment> planRoots = planner.createParallelPlans();
+      for (PlanFragment planRoot: planRoots) {
+        TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
+        queryExecRequest.addToMt_plans(thriftPlan);
+      }
+      queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
+      queryExecRequest.setQuery_ctx(queryCtx);
+      explainString.append(planner.getExplainString(
+          Lists.newArrayList(planRoots.get(0)), queryExecRequest,
+          TExplainLevel.STANDARD));
+      queryExecRequest.setQuery_plan(explainString.toString());
+      result.setQuery_exec_request(queryExecRequest);
+      return result;
+    }
     ArrayList<PlanFragment> fragments = planner.createPlan();
 
     List<ScanNode> scanNodes = Lists.newArrayList();
@@ -954,11 +972,11 @@ public class Frontend {
     // queryExecRequest.dest_fragment_idx
     Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
 
-    for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
-      PlanFragment fragment = fragments.get(fragmentId);
+    for (int idx = 0; idx < fragments.size(); ++idx) {
+      PlanFragment fragment = fragments.get(idx);
       Preconditions.checkNotNull(fragment.getPlanRoot());
       fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
-      fragmentIdx.put(fragment, fragmentId);
+      fragmentIdx.put(fragment, idx);
     }
 
     // set fragment destinations

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
index 7abd81a..ce8980e 100644
--- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
@@ -375,10 +375,9 @@ public class PlannerTestBase {
    * locations to actualScanRangeLocations; compares both to the appropriate sections
    * of 'testCase'.
    */
-  private void RunTestCase(TestCase testCase, StringBuilder errorLog,
+  private void runTestCase(TestCase testCase, StringBuilder errorLog,
       StringBuilder actualOutput, String dbName, TQueryOptions options)
       throws CatalogException {
-
     if (options == null) {
       options = defaultQueryOptions();
     } else {
@@ -396,11 +395,13 @@ public class PlannerTestBase {
     queryCtx.request.query_options = options;
     // Test single node plan, scan range locations, and column lineage.
     TExecRequest singleNodeExecRequest =
-        testPlan(testCase, true, queryCtx, errorLog, actualOutput);
+        testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput);
     checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput);
     checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput);
     // Test distributed plan.
-    testPlan(testCase, false, queryCtx, errorLog, actualOutput);
+    testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput);
+    // test parallel plans
+    testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput);
   }
 
   /**
@@ -410,17 +411,20 @@ public class PlannerTestBase {
    * Returns the produced exec request or null if there was an error generating
    * the plan.
    */
-  private TExecRequest testPlan(TestCase testCase, boolean singleNodePlan,
+  private TExecRequest testPlan(TestCase testCase, Section section,
       TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) {
-    Section section = (singleNodePlan) ? Section.PLAN : Section.DISTRIBUTEDPLAN;
     String query = testCase.getQuery();
     queryCtx.request.setStmt(query);
     if (section == Section.PLAN) {
       queryCtx.request.getQuery_options().setNum_nodes(1);
     } else {
+      // for distributed and parallel execution we want to run on all available nodes
       queryCtx.request.getQuery_options().setNum_nodes(
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
+    if (section == Section.PARALLELPLANS) {
+      queryCtx.request.query_options.mt_num_cores = 2;
+    }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
     boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();
     String expectedErrorMsg = getExpectedErrorMessage(expectedPlan);
@@ -614,7 +618,7 @@ public class PlannerTestBase {
       actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n"));
       actualOutput.append("\n");
       try {
-        RunTestCase(testCase, errorLog, actualOutput, dbName, options);
+        runTestCase(testCase, errorLog, actualOutput, dbName, options);
       } catch (CatalogException e) {
         errorLog.append(String.format("Failed to plan query\n%s\n%s",
             testCase.getQuery(), e.getMessage()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
index cac975a..eb1ca6d 100644
--- a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
+++ b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
@@ -61,6 +61,7 @@ public class TestFileParser {
     RESULTS,
     PLAN,
     DISTRIBUTEDPLAN,
+    PARALLELPLANS,
     FILEERRORS,
     PARTITIONS,
     SETUP,
@@ -177,10 +178,14 @@ public class TestFileParser {
      * Returns false if the current test case is invalid due to missing sections or query
      */
     public boolean isValid() {
-      return !getQuery().isEmpty() && (!getSectionContents(Section.PLAN).isEmpty() ||
-          !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty() ||
-          !getSectionContents(Section.LINEAGE).isEmpty());
+      return !getQuery().isEmpty()
+          && (!getSectionContents(Section.PLAN).isEmpty()
+            || !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty()
+            || !getSectionContents(Section.PARALLELPLANS).isEmpty()
+            || !getSectionContents(Section.LINEAGE).isEmpty());
     }
+
+    public boolean isEmpty() { return expectedResultSections.isEmpty(); }
   }
 
   private final List<TestCase> testCases = Lists.newArrayList();
@@ -243,7 +248,7 @@ public class TestFileParser {
       ++lineNum;
       if (line.startsWith("====") && sectionCount > 0) {
         currentTestCase.addSection(currentSection, sectionContents);
-        if(!currentTestCase.isValid()) {
+        if (!currentTestCase.isValid()) {
           throw new IllegalStateException("Invalid test case" +
               " at line " + currentTestCase.startLineNum + " detected.");
         }
@@ -288,7 +293,7 @@ public class TestFileParser {
       }
     }
 
-    if(!currentTestCase.isValid()) {
+    if (!currentTestCase.isEmpty() && !currentTestCase.isValid()) {
       throw new IllegalStateException("Invalid test case" +
           " at line " + currentTestCase.startLineNum + " detected.");
     }
@@ -308,7 +313,8 @@ public class TestFileParser {
       open(table);
       testCases.clear();
       while (scanner.hasNextLine()) {
-        testCases.add(parseOneTestCase());
+        TestCase testCase = parseOneTestCase();
+        if (!testCase.isEmpty()) testCases.add(testCase);
       }
     } finally {
       close();