You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/12 22:09:56 UTC
[21/50] [abbrv] incubator-impala git commit: MT: Planner for
multi-threaded execution
MT: Planner for multi-threaded execution
New classes:
- ParallelPlanner: creates build plans, assigns plans to cohorts
- JoinBuildSink: DataSink for plan fragments that materialize build sides
- ids for plans, hash tables, plan fragments
Tests: this adds a new test file section PARALLELPLANS and augments the tpc-h/-ds tests with
those sections.
In the interest of keeping this patch small I didn't augment other test files with that
section yet (which will happen at a later date, to cover more corner cases).
Change-Id: Ic3c34dd3f9190a131e6f03d901b4bfcd164a5174
Reviewed-on: http://gerrit.cloudera.org:8080/2846
Tested-by: Internal Jenkins
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b7d5b7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b7d5b7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b7d5b7c
Branch: refs/heads/master
Commit: 3b7d5b7c178fff8b269b6854842cc5dcf2e4d725
Parents: 8e64273
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Mon Mar 21 20:13:30 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 14:17:56 2016 -0700
----------------------------------------------------------------------
be/src/service/query-options.cc | 12 +
be/src/service/query-options.h | 5 +-
common/thrift/DataSinks.thrift | 12 +-
common/thrift/Frontend.thrift | 5 +
common/thrift/ImpalaInternalService.thrift | 6 +
common/thrift/ImpalaService.thrift | 3 +
common/thrift/Planner.thrift | 10 +
common/thrift/Types.thrift | 1 +
.../impala/analysis/FunctionCallExpr.java | 3 +-
.../impala/analysis/TupleDescriptor.java | 3 +
.../com/cloudera/impala/common/TreeNode.java | 68 +-
.../com/cloudera/impala/planner/CohortId.java | 39 +
.../com/cloudera/impala/planner/DataSink.java | 1 -
.../impala/planner/DistributedPlanner.java | 66 +-
.../cloudera/impala/planner/ExchangeNode.java | 3 +-
.../cloudera/impala/planner/HashJoinNode.java | 6 +
.../cloudera/impala/planner/JoinBuildSink.java | 100 +
.../com/cloudera/impala/planner/JoinNode.java | 6 +
.../cloudera/impala/planner/JoinTableId.java | 44 +
.../impala/planner/NestedLoopJoinNode.java | 4 +
.../impala/planner/ParallelPlanner.java | 202 ++
.../cloudera/impala/planner/PlanFragment.java | 142 +-
.../com/cloudera/impala/planner/PlanId.java | 39 +
.../com/cloudera/impala/planner/PlanNode.java | 16 +-
.../com/cloudera/impala/planner/Planner.java | 14 +
.../com/cloudera/impala/service/Frontend.java | 24 +-
.../impala/planner/PlannerTestBase.java | 18 +-
.../impala/testutil/TestFileParser.java | 18 +-
.../queries/PlannerTest/tpcds-all.test | 3400 ++++++++++++++++--
.../queries/PlannerTest/tpch-all.test | 1472 +++++++-
30 files changed, 5187 insertions(+), 555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8f5a682..45571bf 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -388,6 +388,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
}
break;
}
+ case TImpalaQueryOptions::MT_NUM_CORES: {
+ StringParser::ParseResult result;
+ const int32_t num_cores =
+ StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+ if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) {
+ return Status(
+ Substitute("$0 is not valid for mt_num_cores. Valid values are in "
+ "[0, 128].", value));
+ }
+ query_options->__set_mt_num_cores(num_cores);
+ break;
+ }
default:
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
// when we add a new query option.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index c445658..a727c8d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::PARQUET_FALLBACK_SCHEMA_RESOLUTION + 1);\
+ TImpalaQueryOptions::MT_NUM_CORES + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -75,7 +75,8 @@ class TQueryOptions;
QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING)\
QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
- QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION);
+ QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
+ QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES);
/// Converts a TQueryOptions struct into a map of key, value pairs.
void TQueryOptionsToMap(const TQueryOptions& query_options,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 18019af..dd381fa 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -22,7 +22,8 @@ include "Partitions.thrift"
enum TDataSinkType {
DATA_STREAM_SINK,
- TABLE_SINK
+ TABLE_SINK,
+ JOIN_BUILD_SINK
}
enum TSinkAction {
@@ -73,6 +74,14 @@ struct TKuduTableSink {
2: optional bool ignore_not_found_or_duplicate;
}
+// Sink to create the build side of a JoinNode.
+struct TJoinBuildSink {
+ 1: required Types.TJoinTableId join_table_id
+
+ // only set for hash join build sinks
+ 2: required list<Exprs.TExpr> build_exprs
+}
+
// Union type of all table sinks.
struct TTableSink {
1: required Types.TTableId target_table_id
@@ -86,4 +95,5 @@ struct TDataSink {
1: required TDataSinkType type
2: optional TDataStreamSink stream_sink
3: optional TTableSink table_sink
+ 4: optional TJoinBuildSink join_build_sink
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 1842694..6363fcf 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -347,6 +347,11 @@ struct TQueryExecRequest {
// it is unpartitioned.
2: required list<Planner.TPlanFragment> fragments
+ // Multi-threaded execution: sequence of plans; the last one materializes
+ // the query result
+ // TODO: this will eventually supercede 'fragments'
+ 14: optional list<Planner.TPlanFragmentTree> mt_plans
+
// Specifies the destination fragment of the output of each fragment.
// parent_fragment_idx.size() == fragments.size() - 1 and
// fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1fcce1b..6c2fc3e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -180,6 +180,12 @@ struct TQueryOptions {
// is always, since fields IDs are NYI). Valid values are "position" (default) and
// "name".
43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
+
+ // Multi-threaded execution: number of cores per query per node.
+ // > 1: multi-threaded execution mode, with given number of cores
+ // 1: single-threaded execution mode
+ // 0: multi-threaded execution mode, number of cores is the pool default
+ 44: optional i32 mt_num_cores = 1
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3f5273e..c9535eb 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -209,6 +209,9 @@ enum TImpalaQueryOptions {
// Determines how to resolve Parquet files' schemas in the absence of field IDs (which
// is always, since fields IDs are NYI). Valid values are "position" and "name".
PARQUET_FALLBACK_SCHEMA_RESOLUTION
+
+ // Multi-threaded execution: number of cores per machine
+ MT_NUM_CORES
}
// The summary of an insert.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index cf2cc04..43a08a9 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -76,3 +76,13 @@ struct TScanRangeLocations {
// non-empty list
2: list<TScanRangeLocation> locations
}
+
+// A plan: tree of plan fragments that materializes either a query result or the build
+// side of a join used by another plan; it consists of a sequence of plan fragments.
+// TODO: rename both this and PlanNodes.TPlan (TPlan should be something like TExecPlan
+// or TExecTree)
+struct TPlanFragmentTree {
+ 1: required i32 cohort_id
+
+ 2: required list<TPlanFragment> fragments
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 8b2c5a2..85a14dc 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -20,6 +20,7 @@ typedef i32 TPlanNodeId
typedef i32 TTupleId
typedef i32 TSlotId
typedef i32 TTableId
+typedef i32 TJoinTableId
// TODO: Consider moving unrelated enums to better locations.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
index 7950d4c..ca4097b 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java
@@ -33,6 +33,7 @@ import com.cloudera.impala.thrift.TFunctionBinaryType;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
public class FunctionCallExpr extends Expr {
private final FunctionName fnName_;
@@ -66,7 +67,7 @@ public class FunctionCallExpr extends Expr {
fnName_ = fnName;
params_ = params;
isMergeAggFn_ = isMergeAggFn;
- if (params.exprs() != null) children_ = params_.exprs();
+ if (params.exprs() != null) children_ = Lists.newArrayList(params_.exprs());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
index e042af1..63bcde5 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/TupleDescriptor.java
@@ -110,10 +110,12 @@ public class TupleDescriptor {
if (path_ == null) return null;
return path_.getRootTable();
}
+
public TableName getTableName() {
Table t = getTable();
return (t == null) ? null : t.getTableName();
}
+
public void setPath(Path p) {
Preconditions.checkNotNull(p);
Preconditions.checkState(p.isResolved());
@@ -128,6 +130,7 @@ public class TupleDescriptor {
type_ = Path.getTypeAsStruct(p.destType());
}
}
+
public Path getPath() { return path_; }
public void setType(StructType type) { type_ = type; }
public StructType getType() { return type_; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
index a7d46ce..37e7995 100644
--- a/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
+++ b/fe/src/main/java/com/cloudera/impala/common/TreeNode.java
@@ -21,12 +21,11 @@ import java.util.List;
import com.cloudera.impala.util.Visitor;
import com.google.common.base.Predicate;
-public class TreeNode<NodeType extends TreeNode<NodeType>> {
- protected List<NodeType> children_;
-
- protected TreeNode() {
- this.children_ = new ArrayList<NodeType>();
- }
+/**
+ * Generic tree structure. Only concrete subclasses of this can be instantiated.
+ */
+public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
+ protected ArrayList<NodeType> children_ = new ArrayList<NodeType>();
public NodeType getChild(int i) {
return hasChild(i) ? children_.get(i) : null;
@@ -36,13 +35,17 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
children_.add(n);
}
+ public void removeChild(NodeType n) { children_.remove(n); }
+
+ public void clearChildren() { children_.clear(); }
+
public void addChildren(List<? extends NodeType> l) {
children_.addAll(l);
}
public boolean hasChild(int i) { return children_.size() > i; }
public void setChild(int index, NodeType n) { children_.set(index, n); }
- public List<NodeType> getChildren() { return children_; }
+ public ArrayList<NodeType> getChildren() { return children_; }
/**
* Count the total number of nodes in this tree. Leaf node will return 1.
@@ -50,9 +53,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public int numNodes() {
int numNodes = 1;
- for (NodeType child: children_) {
- numNodes += child.numNodes();
- }
+ for (NodeType child: children_) numNodes += child.numNodes();
return numNodes;
}
@@ -72,10 +73,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
matches.add((D) this);
return;
}
-
- for (NodeType child: children_) {
- child.collect(predicate, matches);
- }
+ for (NodeType child: children_) child.collect(predicate, matches);
}
/**
@@ -89,10 +87,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
matches.add((D) this);
return;
}
-
- for (NodeType child: children_) {
- child.collect(cl, matches);
- }
+ for (NodeType child: children_) child.collect(cl, matches);
}
/**
@@ -102,13 +97,8 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public <C extends TreeNode<NodeType>, D extends C> void collectAll(
Predicate<? super C> predicate, List<D> matches) {
- if (predicate.apply((C) this)) {
- matches.add((D) this);
- }
-
- for (NodeType child: children_) {
- child.collectAll(predicate, matches);
- }
+ if (predicate.apply((C) this)) matches.add((D) this);
+ for (NodeType child: children_) child.collectAll(predicate, matches);
}
/**
@@ -117,9 +107,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public static <C extends TreeNode<C>, D extends C> void collect(
Collection<C> nodeList, Predicate<? super C> predicate, Collection<D> matches) {
- for (C node: nodeList) {
- node.collect(predicate, matches);
- }
+ for (C node: nodeList) node.collect(predicate, matches);
}
/**
@@ -128,9 +116,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public static <C extends TreeNode<C>, D extends C> void collect(
Collection<C> nodeList, Class cl, Collection<D> matches) {
- for (C node: nodeList) {
- node.collect(cl, matches);
- }
+ for (C node: nodeList) node.collect(cl, matches);
}
/**
@@ -139,9 +125,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
public <C extends TreeNode<NodeType>> boolean contains(
Predicate<? super C> predicate) {
if (predicate.apply((C) this)) return true;
- for (NodeType child: children_) {
- if (child.contains(predicate)) return true;
- }
+ for (NodeType child: children_) if (child.contains(predicate)) return true;
return false;
}
@@ -150,9 +134,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public boolean contains(Class cl) {
if (cl.equals(getClass())) return true;
- for (NodeType child: children_) {
- if (child.contains(cl)) return true;
- }
+ for (NodeType child: children_) if (child.contains(cl)) return true;
return false;
}
@@ -162,9 +144,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public static <C extends TreeNode<C>, D extends C> boolean contains(
Collection<C> nodeList, Predicate<? super C> predicate) {
- for (C node: nodeList) {
- if (node.contains(predicate)) return true;
- }
+ for (C node: nodeList) if (node.contains(predicate)) return true;
return false;
}
@@ -173,9 +153,7 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public static <C extends TreeNode<C>> boolean contains(
List<C> nodeList, Class cl) {
- for (C node: nodeList) {
- if (node.contains(cl)) return true;
- }
+ for (C node: nodeList) if (node.contains(cl)) return true;
return false;
}
@@ -196,8 +174,6 @@ public class TreeNode<NodeType extends TreeNode<NodeType>> {
*/
public <C extends TreeNode<NodeType>> void accept(Visitor<C> visitor) {
visitor.visit((C) this);
- for (NodeType p: children_) {
- p.accept(visitor);
- }
+ for (NodeType p: children_) p.accept(visitor);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/CohortId.java b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
new file mode 100644
index 0000000..2b5b2e4
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/CohortId.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class CohortId extends Id<CohortId> {
+ // Construction only allowed via an IdGenerator.
+ protected CohortId(int id) {
+ super(id);
+ }
+
+ public static IdGenerator<CohortId> createGenerator() {
+ return new IdGenerator<CohortId>() {
+ @Override
+ public CohortId getNextId() { return new CohortId(nextId_++); }
+ @Override
+ public CohortId getMaxId() { return new CohortId(nextId_ - 1); }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%02d", id_);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
index 084dd96..a298781 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DataSink.java
@@ -29,7 +29,6 @@ import com.cloudera.impala.thrift.TExplainLevel;
* The destination could be another plan fragment on a remote machine,
* or a table into which the rows are to be inserted
* (i.e., the destination of the last fragment of an INSERT statement).
- *
*/
public abstract class DataSink {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
index d1a3069..bdf32e9 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java
@@ -230,7 +230,7 @@ public class DistributedPlanner {
Preconditions.checkState(partitionHint == null || partitionHint);
ExchangeNode exchNode = new ExchangeNode(ctx_.getNextNodeId());
- exchNode.addChild(inputFragment.getPlanRoot(), false);
+ exchNode.addChild(inputFragment.getPlanRoot());
exchNode.init(analyzer);
Preconditions.checkState(exchNode.hasValidStats());
DataPartition partition = DataPartition.hashPartitioned(nonConstPartitionExprs);
@@ -251,7 +251,7 @@ public class DistributedPlanner {
throws ImpalaException {
Preconditions.checkState(inputFragment.isPartitioned());
ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId());
- mergePlan.addChild(inputFragment.getPlanRoot(), false);
+ mergePlan.addChild(inputFragment.getPlanRoot());
mergePlan.init(ctx_.getRootAnalyzer());
Preconditions.checkState(mergePlan.hasValidStats());
PlanFragment fragment = new PlanFragment(ctx_.getNextFragmentId(), mergePlan,
@@ -292,7 +292,7 @@ public class DistributedPlanner {
throws ImpalaException {
node.setDistributionMode(DistributionMode.BROADCAST);
node.setChild(0, leftChildFragment.getPlanRoot());
- connectChildFragment(node, 1, rightChildFragment);
+ connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
return leftChildFragment;
}
@@ -323,11 +323,11 @@ public class DistributedPlanner {
lhsJoinExprs, rhsJoinExprs, analyzer)) {
node.setChild(0, leftChildFragment.getPlanRoot());
node.setChild(1, rightChildFragment.getPlanRoot());
- // Redirect fragments sending to rightFragment to leftFragment.
- for (PlanFragment fragment: fragments) {
- if (fragment.getDestFragment() == rightChildFragment) {
- fragment.setDestination(fragment.getDestNode());
- }
+ // fix up PlanNode.fragment_ for the migrated PlanNode tree of the rhs child
+ leftChildFragment.setFragmentInPlanTree(node.getChild(1));
+ // Relocate input fragments of rightChildFragment to leftChildFragment.
+ for (PlanFragment rhsInput: rightChildFragment.getChildren()) {
+ leftChildFragment.getChildren().add(rhsInput);
}
// Remove right fragment because its plan tree has been merged into leftFragment.
fragments.remove(rightChildFragment);
@@ -346,7 +346,7 @@ public class DistributedPlanner {
leftChildFragment.getDataPartition(), rhsJoinExprs, analyzer);
if (rhsJoinPartition != null) {
node.setChild(0, leftChildFragment.getPlanRoot());
- connectChildFragment(node, 1, rightChildFragment);
+ connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
rightChildFragment.setOutputPartition(rhsJoinPartition);
leftChildFragment.setPlanRoot(node);
return leftChildFragment;
@@ -360,7 +360,7 @@ public class DistributedPlanner {
rightChildFragment.getDataPartition(), lhsJoinExprs, analyzer);
if (lhsJoinPartition != null) {
node.setChild(1, rightChildFragment.getPlanRoot());
- connectChildFragment(node, 0, leftChildFragment);
+ connectChildFragment(node, 0, rightChildFragment, leftChildFragment);
leftChildFragment.setOutputPartition(lhsJoinPartition);
rightChildFragment.setPlanRoot(node);
return rightChildFragment;
@@ -379,11 +379,11 @@ public class DistributedPlanner {
// on their respective join exprs.
// The new fragment is hash-partitioned on the lhs input join exprs.
ExchangeNode lhsExchange = new ExchangeNode(ctx_.getNextNodeId());
- lhsExchange.addChild(leftChildFragment.getPlanRoot(), false);
+ lhsExchange.addChild(leftChildFragment.getPlanRoot());
lhsExchange.computeStats(null);
node.setChild(0, lhsExchange);
ExchangeNode rhsExchange = new ExchangeNode(ctx_.getNextNodeId());
- rhsExchange.addChild(rightChildFragment.getPlanRoot(), false);
+ rhsExchange.addChild(rightChildFragment.getPlanRoot());
rhsExchange.computeStats(null);
node.setChild(1, rhsExchange);
@@ -502,7 +502,7 @@ public class DistributedPlanner {
// the join; the build input is provided by an ExchangeNode, which is the
// destination of the rightChildFragment's output
node.setChild(0, leftChildFragment.getPlanRoot());
- connectChildFragment(node, 1, rightChildFragment);
+ connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
leftChildFragment.setPlanRoot(node);
hjFragment = leftChildFragment;
} else {
@@ -624,15 +624,22 @@ public class DistributedPlanner {
if (!childFragments.get(i).isPartitioned()) ++numUnpartitionedChildFragments;
}
+ // remove all children to avoid them being tagged with the wrong
+ // fragment (in the PlanFragment c'tor; we haven't created ExchangeNodes yet)
+ unionNode.clearChildren();
+
// If all child fragments are unpartitioned, return a single unpartitioned fragment
// with a UnionNode that merges all child fragments.
if (numUnpartitionedChildFragments == childFragments.size()) {
- // Absorb the plan trees of all childFragments into unionNode.
- for (int i = 0; i < childFragments.size(); ++i) {
- unionNode.setChild(i, childFragments.get(i).getPlanRoot());
- }
PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
unionNode, DataPartition.UNPARTITIONED);
+ // Absorb the plan trees of all childFragments into unionNode
+ // and fix up the fragment tree in the process.
+ for (int i = 0; i < childFragments.size(); ++i) {
+ unionNode.addChild(childFragments.get(i).getPlanRoot());
+ unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
+ unionFragment.addChildren(childFragments.get(i).getChildren());
+ }
unionNode.init(ctx_.getRootAnalyzer());
// All child fragments have been absorbed into unionFragment.
fragments.removeAll(childFragments);
@@ -640,22 +647,24 @@ public class DistributedPlanner {
}
// There is at least one partitioned child fragment.
+ PlanFragment unionFragment = new PlanFragment(
+ ctx_.getNextFragmentId(), unionNode, DataPartition.RANDOM);
for (int i = 0; i < childFragments.size(); ++i) {
PlanFragment childFragment = childFragments.get(i);
if (childFragment.isPartitioned()) {
- // Absorb the plan trees of all partitioned child fragments into unionNode.
- unionNode.setChild(i, childFragment.getPlanRoot());
+ // absorb the plan trees of all partitioned child fragments into unionNode
+ unionNode.addChild(childFragment.getPlanRoot());
+ unionFragment.setFragmentInPlanTree(unionNode.getChild(i));
+ unionFragment.addChildren(childFragment.getChildren());
fragments.remove(childFragment);
} else {
+ // dummy entry for subsequent addition of the ExchangeNode
+ unionNode.addChild(null);
// Connect the unpartitioned child fragments to unionNode via a random exchange.
- connectChildFragment(unionNode, i, childFragment);
+ connectChildFragment(unionNode, i, unionFragment, childFragment);
childFragment.setOutputPartition(DataPartition.RANDOM);
}
}
-
- // Fragment contains the UnionNode that consumes the data of all child fragments.
- PlanFragment unionFragment = new PlanFragment(ctx_.getNextFragmentId(),
- unionNode, DataPartition.RANDOM);
unionNode.reorderOperands(ctx_.getRootAnalyzer());
unionNode.init(ctx_.getRootAnalyzer());
return unionFragment;
@@ -678,13 +687,14 @@ public class DistributedPlanner {
/**
* Replace node's child at index childIdx with an ExchangeNode that receives its
- * input from childFragment.
+ * input from childFragment. ParentFragment contains node and the new ExchangeNode.
*/
private void connectChildFragment(PlanNode node, int childIdx,
- PlanFragment childFragment) throws ImpalaException {
+ PlanFragment parentFragment, PlanFragment childFragment) throws ImpalaException {
ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
- exchangeNode.addChild(childFragment.getPlanRoot(), false);
+ exchangeNode.addChild(childFragment.getPlanRoot());
exchangeNode.init(ctx_.getRootAnalyzer());
+ exchangeNode.setFragment(parentFragment);
node.setChild(childIdx, exchangeNode);
childFragment.setDestination(exchangeNode);
}
@@ -703,7 +713,7 @@ public class DistributedPlanner {
PlanFragment childFragment, DataPartition parentPartition)
throws ImpalaException {
ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId());
- exchangeNode.addChild(childFragment.getPlanRoot(), false);
+ exchangeNode.addChild(childFragment.getPlanRoot());
exchangeNode.init(ctx_.getRootAnalyzer());
PlanFragment parentFragment = new PlanFragment(ctx_.getNextFragmentId(),
exchangeNode, parentPartition);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
index 22601b5..590f718 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/ExchangeNode.java
@@ -71,7 +71,7 @@ public class ExchangeNode extends PlanNode {
Preconditions.checkState(conjuncts_.isEmpty());
}
- public void addChild(PlanNode node, boolean copyConjuncts) {
+ public void addChild(PlanNode node) {
// This ExchangeNode 'inherits' several parameters from its children.
// Ensure that all children agree on them.
if (!children_.isEmpty()) {
@@ -84,7 +84,6 @@ public class ExchangeNode extends PlanNode {
tupleIds_ = Lists.newArrayList(node.tupleIds_);
nullableTupleIds_ = Sets.newHashSet(node.nullableTupleIds_);
}
- if (copyConjuncts) conjuncts_.addAll(Expr.cloneList(node.conjuncts_));
children_.add(node);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
index 10e0460..e641119 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java
@@ -143,6 +143,12 @@ public class HashJoinNode extends JoinNode {
output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
getDisplayLabelDetail()));
+ if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) {
+ if (joinTableId_.isValid()) {
+ output.append(
+ detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n");
+ }
+ }
if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
output.append(detailPrefix + "hash predicates: ");
for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
new file mode 100644
index 0000000..2971bf8
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinBuildSink.java
@@ -0,0 +1,100 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.analysis.Analyzer;
+import com.cloudera.impala.analysis.BinaryPredicate;
+import com.cloudera.impala.analysis.Expr;
+import com.cloudera.impala.common.ImpalaException;
+import com.cloudera.impala.thrift.TDataSink;
+import com.cloudera.impala.thrift.TDataSinkType;
+import com.cloudera.impala.thrift.TExplainLevel;
+import com.cloudera.impala.thrift.TJoinBuildSink;
+import com.cloudera.impala.thrift.TPlanNode;
+import com.cloudera.impala.thrift.TPlanNodeType;
+import com.cloudera.impala.thrift.TQueryOptions;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Sink to materialize the build side of a join.
+ */
+public class JoinBuildSink extends DataSink {
+ private final static Logger LOG = LoggerFactory.getLogger(JoinBuildSink.class);
+
+ // id of join's build-side table assigned during planning
+ private final JoinTableId joinTableId_;
+
+ private final List<Expr> buildExprs_ = Lists.newArrayList();
+
+ /**
+ * Creates sink for build side of 'joinNode' (extracts buildExprs_ from joinNode).
+ */
+ public JoinBuildSink(JoinTableId joinTableId, JoinNode joinNode) {
+ Preconditions.checkState(joinTableId.isValid());
+ joinTableId_ = joinTableId;
+ Preconditions.checkNotNull(joinNode);
+ Preconditions.checkState(joinNode instanceof JoinNode);
+ if (!(joinNode instanceof HashJoinNode)) return;
+ for (Expr eqJoinConjunct: joinNode.getEqJoinConjuncts()) {
+ BinaryPredicate p = (BinaryPredicate) eqJoinConjunct;
+ // by convention the build exprs are the rhs of the join conjuncts
+ buildExprs_.add(p.getChild(1).clone());
+ }
+ }
+
+ public JoinTableId getJoinTableId() { return joinTableId_; }
+
+ @Override
+ protected TDataSink toThrift() {
+ TDataSink result = new TDataSink(TDataSinkType.JOIN_BUILD_SINK);
+ TJoinBuildSink tBuildSink = new TJoinBuildSink();
+ tBuildSink.setJoin_table_id(joinTableId_.asInt());
+ for (Expr buildExpr: buildExprs_) {
+ tBuildSink.addToBuild_exprs(buildExpr.treeToThrift());
+ }
+ result.setJoin_build_sink(tBuildSink);
+ return result;
+ }
+
+ @Override
+ public String getExplainString(String prefix, String detailPrefix,
+ TExplainLevel detailLevel) {
+ StringBuilder output = new StringBuilder();
+ output.append(String.format("%s%s\n", prefix, "JOIN BUILD"));
+ if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
+ output.append(
+ detailPrefix + "join-table-id=" + joinTableId_.toString()
+ + " plan-id=" + fragment_.getPlanId().toString()
+ + " cohort-id=" + fragment_.getCohortId().toString() + "\n");
+ if (!buildExprs_.isEmpty()) {
+ output.append(detailPrefix + "build expressions: ")
+ .append(Expr.toSql(buildExprs_) + "\n");
+ }
+ }
+ return output.toString();
+ }
+
+ @Override
+ public void computeCosts() {
+ // TODO: implement?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
index 19e4724..34de765 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java
@@ -59,6 +59,10 @@ public abstract class JoinNode extends PlanNode {
protected List<BinaryPredicate> eqJoinConjuncts_;
protected List<Expr> otherJoinConjuncts_;
+ // if valid, the rhs input is materialized outside of this node and is assigned
+ // joinTableId_
+ protected JoinTableId joinTableId_ = JoinTableId.INVALID;
+
public enum DistributionMode {
NONE("NONE"),
BROADCAST("BROADCAST"),
@@ -129,6 +133,8 @@ public abstract class JoinNode extends PlanNode {
public DistributionMode getDistributionModeHint() { return distrModeHint_; }
public DistributionMode getDistributionMode() { return distrMode_; }
public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; }
+ public JoinTableId getJoinTableId() { return joinTableId_; }
+ public void setJoinTableId(JoinTableId id) { joinTableId_ = id; }
@Override
public void init(Analyzer analyzer) throws ImpalaException {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
new file mode 100644
index 0000000..4e0f542
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/JoinTableId.java
@@ -0,0 +1,44 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class JoinTableId extends Id<JoinTableId> {
+ // Construction only allowed via an IdGenerator.
+ protected JoinTableId(int id) {
+ super(id);
+ }
+
+ public static JoinTableId INVALID;
+ static {
+ INVALID = new JoinTableId(Id.INVALID_ID);
+ }
+
+ public static IdGenerator<JoinTableId> createGenerator() {
+ return new IdGenerator<JoinTableId>() {
+ @Override
+ public JoinTableId getNextId() { return new JoinTableId(nextId_++); }
+ @Override
+ public JoinTableId getMaxId() { return new JoinTableId(nextId_ - 1); }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%02d", id_);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
index 474a60c..fd9ef2f 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java
@@ -93,6 +93,10 @@ public class NestedLoopJoinNode extends JoinNode {
displayName_, getDisplayLabelDetail()));
}
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
+ if (joinTableId_.isValid()) {
+ output.append(
+ detailPrefix + "join table id: " + joinTableId_.toString() + "\n");
+ }
if (!otherJoinConjuncts_.isEmpty()) {
output.append(detailPrefix + "join predicates: ")
.append(getExplainString(otherJoinConjuncts_) + "\n");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
new file mode 100644
index 0000000..f66ce15
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/ParallelPlanner.java
@@ -0,0 +1,202 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.impala.common.IdGenerator;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * The parallel planner is responsible for breaking up a single distributed plan
+ * (= tree of PlanFragments) into a (logical) tree of distributed plans. The root
+ * of that tree produces the query result, all the other ones produce intermediate
+ * join build sides. All plans that produce intermediate join build sides (one per join
+ * node in the recipient) for a single recipient plan are grouped together into a
+ * cohort. Since each plan may only produce a build side for at most one recipient
+ * plan, each plan belongs to exactly one cohort.
+ *
+ * TODO: if the input to the JoinBuildSink is the result of a grouping aggregation
+ * on the join keys, the AggregationNode should materialize the final hash table
+ * directly (instead of reading the hash table content and feeding it into a
+ * JoinBuildSink to build another hash table)
+ *
+ * TODO: instead of cohort ids, create a Plan class that is a subclass of TreeNode?
+ */
+public class ParallelPlanner {
+ private final static Logger LOG = LoggerFactory.getLogger(ParallelPlanner.class);
+
+ private final IdGenerator<JoinTableId> joinTableIdGenerator_ =
+ JoinTableId.createGenerator();
+ private final IdGenerator<PlanId> planIdGenerator_ = PlanId.createGenerator();
+ private final IdGenerator<CohortId> cohortIdGenerator_ = CohortId.createGenerator();
+ private final PlannerContext ctx_;
+
+ private List<PlanFragment> planRoots_ = Lists.newArrayList();
+
+ public ParallelPlanner(PlannerContext ctx) { ctx_ = ctx; }
+
+ /**
+ * Given a distributed plan, return list of plans ready for parallel execution.
+ * The last plan in the sequence materializes the query result, the preceding
+ * plans materialize the build sides of joins.
+ * Assigns cohortId and planId for all fragments.
+ * TODO: create class DistributedPlan with a PlanFragment member, so we don't
+ * need to distinguish PlanFragment and Plan through comments?
+ */
+ public List<PlanFragment> createPlans(PlanFragment root) {
+ root.setPlanId(planIdGenerator_.getNextId());
+ root.setCohortId(cohortIdGenerator_.getNextId());
+ planRoots_.add(root);
+ createBuildPlans(root, null);
+ return planRoots_;
+ }
+
+ /**
+ * Recursively traverse tree of fragments of 'plan' from top to bottom and
+ * move all build inputs of joins into separate plans. 'buildCohortId' is the
+ * cohort id of the build plans of 'fragment' and may be null if the plan
+ * to which 'fragment' belongs has so far not required any build plans.
+ * Assign fragment's plan id and cohort id to children.
+ */
+ private void createBuildPlans(PlanFragment fragment, CohortId buildCohortId) {
+ LOG.info("createbuildplans fragment " + fragment.getId().toString());
+ List<JoinNode> joins = Lists.newArrayList();
+ collectJoins(fragment.getPlanRoot(), joins);
+ if (!joins.isEmpty()) {
+ List<String> joinIds = Lists.newArrayList();
+ for (JoinNode join: joins) joinIds.add(join.getId().toString());
+ LOG.info("collected joins " + Joiner.on(" ").join(joinIds));
+
+ if (buildCohortId == null) buildCohortId = cohortIdGenerator_.getNextId();
+ for (JoinNode join: joins) createBuildPlan(join, buildCohortId);
+ }
+
+ if (!fragment.getChildren().isEmpty()) {
+ List<String> ids = Lists.newArrayList();
+ for (PlanFragment c: fragment.getChildren()) ids.add(c.getId().toString());
+ LOG.info("collected children " + Joiner.on(" ").join(ids) + " parent "
+ + fragment.getId().toString());
+ }
+ for (PlanFragment child: fragment.getChildren()) {
+ child.setPlanId(fragment.getPlanId());
+ child.setCohortId(fragment.getCohortId());
+ createBuildPlans(child, buildCohortId);
+ }
+ }
+
+ /**
+ * Collect all JoinNodes that aren't themselves the build side of a join node
+ * in this fragment or the rhs of a SubplanNode.
+ */
+ private void collectJoins(PlanNode node, List<JoinNode> result) {
+ if (node instanceof JoinNode) {
+ result.add((JoinNode)node);
+ // for joins, only descend through the probe side;
+ // we're recursively traversing the build side when constructing the build plan
+ // in createBuildPlan()
+ collectJoins(node.getChild(0), result);
+ return;
+ }
+ if (node instanceof ExchangeNode) return;
+ if (node instanceof SubplanNode) {
+ collectJoins(node.getChild(0), result);
+ return;
+ }
+ for (PlanNode child: node.getChildren()) collectJoins(child, result);
+ }
+
+ /**
+ * Collect all ExchangeNodes in this fragment.
+ */
+ private void collectExchangeNodes(PlanNode node, List<ExchangeNode> result) {
+ if (node instanceof ExchangeNode) {
+ result.add((ExchangeNode)node);
+ return;
+ }
+ for (PlanNode child: node.getChildren()) collectExchangeNodes(child, result);
+ }
+
+ /**
+ * Create new plan that materializes build input of 'join' and assign it 'cohortId'.
+ * In the process, moves all fragments required for this materialization from tree
+ * rooted at 'join's fragment into the new plan.
+ * Also assigns the new plan a plan id.
+ */
+ private void createBuildPlan(JoinNode join, CohortId cohortId) {
+ LOG.info("createbuildplan " + join.getId().toString());
+ Preconditions.checkNotNull(cohortId);
+ // collect all ExchangeNodes on the build side and their corresponding input
+ // fragments
+ final List<ExchangeNode> exchNodes = Lists.newArrayList();
+ collectExchangeNodes(join.getChild(1), exchNodes);
+
+ com.google.common.base.Predicate<PlanFragment> isInputFragment =
+ new com.google.common.base.Predicate<PlanFragment>() {
+ public boolean apply(PlanFragment f) {
+ // we're starting with the fragment containing the join, which might
+ // be terminal
+ if (f.getDestNode() == null) return false;
+ for (ExchangeNode exch: exchNodes) {
+ if (exch.getId() == f.getDestNode().getId()) return true;
+ }
+ return false;
+ }
+ };
+ List<PlanFragment> inputFragments = Lists.newArrayList();
+ join.getFragment().collect(isInputFragment, inputFragments);
+ Preconditions.checkState(exchNodes.size() == inputFragments.size());
+
+ // Create new fragment with JoinBuildSink that consumes the output of the
+ // join's rhs input (the one that materializes the build side).
+ // The new fragment has the same data partition as the join node's fragment.
+ JoinBuildSink buildSink =
+ new JoinBuildSink(joinTableIdGenerator_.getNextId(), join);
+ join.setJoinTableId(buildSink.getJoinTableId());
+ // c'tor fixes up PlanNode.fragment_
+ PlanFragment buildFragment = new PlanFragment(ctx_.getNextFragmentId(),
+ join.getChild(1), join.getFragment().getDataPartition());
+ buildFragment.setSink(buildSink);
+
+ // move input fragments
+ for (int i = 0; i < exchNodes.size(); ++i) {
+ LOG.info("re-link fragment " + inputFragments.get(i).getId().toString() + " to "
+ + exchNodes.get(i).getFragment().getId().toString());
+ Preconditions.checkState(exchNodes.get(i).getFragment() == buildFragment);
+ join.getFragment().removeChild(inputFragments.get(i));
+ buildFragment.getChildren().add(inputFragments.get(i));
+ }
+
+ // assign plan and cohort id
+ buildFragment.setPlanId(planIdGenerator_.getNextId());
+ PlanId parentPlanId = join.getFragment().getPlanId();
+ buildFragment.setCohortId(cohortId);
+
+ planRoots_.add(buildFragment);
+ LOG.info("new build fragment " + buildFragment.getId().toString());
+ LOG.info("in cohort " + buildFragment.getCohortId().toString());
+ LOG.info("for join node " + join.getId().toString());
+ createBuildPlans(buildFragment, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
index f702271..85a4d40 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
@@ -29,26 +29,36 @@ import com.cloudera.impala.catalog.HdfsTable;
import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.common.InternalException;
import com.cloudera.impala.common.NotImplementedException;
+import com.cloudera.impala.common.TreeNode;
import com.cloudera.impala.planner.JoinNode.DistributionMode;
import com.cloudera.impala.thrift.TExplainLevel;
import com.cloudera.impala.thrift.TPartitionType;
+import com.cloudera.impala.thrift.TPlan;
import com.cloudera.impala.thrift.TPlanFragment;
+import com.cloudera.impala.thrift.TPlanFragmentTree;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
/**
- * A PlanFragment is part of a tree of such fragments that together make
- * up a complete execution plan for a single query. Each plan fragment can have
- * one or many instances, each of which in turn is executed by a single node and the
- * output sent to a specific instance of the destination fragment (or, in the case
- * of the root fragment, is materialized in some form).
+ * PlanFragments form a tree structure via their ExchangeNodes. A tree of fragments
+ * connected in that way forms a plan. The output of a plan is produced by the root
+ * fragment and is either the result of the query or an intermediate result
+ * needed by a different plan (such as a hash table).
*
- * The plan fragment encapsulates the specific tree of execution nodes that
+ * Plans are grouped into cohorts based on the consumer of their output: all
+ * plans that materialize intermediate results for a particular consumer plan
+ * are grouped into a single cohort.
+ *
+ * A PlanFragment encapsulates the specific tree of execution nodes that
* are used to produce the output of the plan fragment, as well as output exprs,
* destination node, etc. If there are no output exprs, the full row that is
* is produced by the plan root is marked as materialized.
*
+ * A plan fragment can have one or many instances, each of which in turn is executed by
+ * an individual node and the output sent to a specific instance of the destination
+ * fragment (or, in the case of the root fragment, is materialized in some form).
+ *
* A hash-partitioned plan fragment is the result of one or more hash-partitioning data
* streams being received by plan nodes in this fragment. In the future, a fragment's
* data partition could also be hash partitioned based on a scan node that is reading
@@ -59,11 +69,17 @@ import com.google.common.collect.Lists;
* - assemble with getters, etc.
* - finalize()
* - toThrift()
+ *
+ * TODO: the tree of PlanNodes is connected across fragment boundaries, which makes
+ * it impossible search for things within a fragment (using TreeNode functions);
+ * fix that
*/
-public class PlanFragment {
+public class PlanFragment extends TreeNode<PlanFragment> {
private final static Logger LOG = LoggerFactory.getLogger(PlanFragment.class);
private final PlanFragmentId fragmentId_;
+ private PlanId planId_;
+ private CohortId cohortId_;
// root of plan tree executed by this fragment
private PlanNode planRoot_;
@@ -103,14 +119,26 @@ public class PlanFragment {
* Does not traverse the children of ExchangeNodes because those must belong to a
* different fragment.
*/
- private void setFragmentInPlanTree(PlanNode node) {
+ public void setFragmentInPlanTree(PlanNode node) {
if (node == null) return;
node.setFragment(this);
- if (!(node instanceof ExchangeNode)) {
- for (PlanNode child : node.getChildren()) {
- setFragmentInPlanTree(child);
- }
- }
+ if (node instanceof ExchangeNode) return;
+ for (PlanNode child : node.getChildren()) setFragmentInPlanTree(child);
+ }
+
+ /**
+ * Collect all PlanNodes that belong to the exec tree of this fragment.
+ */
+ public void collectPlanNodes(List<PlanNode> nodes) {
+ Preconditions.checkNotNull(nodes);
+ collectPlanNodesHelper(planRoot_, nodes);
+ }
+
+ private void collectPlanNodesHelper(PlanNode root, List<PlanNode> nodes) {
+ if (root == null) return;
+ nodes.add(root);
+ if (root instanceof ExchangeNode) return;
+ for (PlanNode child: root.getChildren()) collectPlanNodesHelper(child, nodes);
}
public void setOutputExprs(List<Expr> outputExprs) {
@@ -216,31 +244,56 @@ public class PlanFragment {
return result;
}
- public String getExplainString(TExplainLevel explainLevel) {
+ public TPlanFragmentTree treeToThrift() {
+ TPlanFragmentTree result = new TPlanFragmentTree();
+ treeToThriftHelper(result);
+ return result;
+ }
+
+ private void treeToThriftHelper(TPlanFragmentTree plan) {
+ plan.addToFragments(toThrift());
+ for (PlanFragment child: children_) {
+ child.treeToThriftHelper(plan);
+ }
+ }
+
+ public String getExplainString(TExplainLevel detailLevel) {
+ return getExplainString("", "", detailLevel);
+ }
+
+ /**
+ * The root of the output tree will be prefixed by rootPrefix and the remaining plan
+ * output will be prefixed by prefix.
+ */
+ protected final String getExplainString(String rootPrefix, String prefix,
+ TExplainLevel detailLevel) {
StringBuilder str = new StringBuilder();
Preconditions.checkState(dataPartition_ != null);
- String rootPrefix = "";
- String prefix = "";
- String detailPrefix = "| ";
- if (explainLevel == TExplainLevel.VERBOSE) {
+ String detailPrefix = prefix + "| "; // sink detail
+ if (detailLevel == TExplainLevel.VERBOSE) {
+ // we're printing a new tree, start over with the indentation
prefix = " ";
rootPrefix = " ";
detailPrefix = prefix + "| ";
str.append(String.format("%s:PLAN FRAGMENT [%s]\n", fragmentId_.toString(),
dataPartition_.getExplainString()));
if (sink_ != null && sink_ instanceof DataStreamSink) {
- str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel) + "\n");
+ str.append(sink_.getExplainString(rootPrefix, prefix, detailLevel) + "\n");
}
}
- // Always print table sinks.
- if (sink_ != null && sink_ instanceof TableSink) {
- str.append(sink_.getExplainString(prefix, detailPrefix, explainLevel));
- if (explainLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
+
+ String planRootPrefix = rootPrefix;
+ // Always print sinks other than DataStreamSinks.
+ if (sink_ != null && !(sink_ instanceof DataStreamSink)) {
+ str.append(sink_.getExplainString(rootPrefix, detailPrefix, detailLevel));
+ if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
str.append(prefix + "|\n");
}
+ // we already used the root prefix for the sink
+ planRootPrefix = prefix;
}
if (planRoot_ != null) {
- str.append(planRoot_.getExplainString(rootPrefix, prefix, explainLevel));
+ str.append(planRoot_.getExplainString(planRootPrefix, prefix, detailLevel));
}
return str.toString();
}
@@ -251,6 +304,10 @@ public class PlanFragment {
}
public PlanFragmentId getId() { return fragmentId_; }
+ public PlanId getPlanId() { return planId_; }
+ public void setPlanId(PlanId id) { planId_ = id; }
+ public CohortId getCohortId() { return cohortId_; }
+ public void setCohortId(CohortId id) { cohortId_ = id; }
public PlanFragment getDestFragment() {
if (destNode_ == null) return null;
return destNode_.getFragment();
@@ -270,7 +327,13 @@ public class PlanFragment {
setFragmentInPlanTree(planRoot_);
}
- public void setDestination(ExchangeNode destNode) { destNode_ = destNode; }
+ public void setDestination(ExchangeNode destNode) {
+ destNode_ = destNode;
+ PlanFragment dest = getDestFragment();
+ Preconditions.checkNotNull(dest);
+ dest.addChild(this);
+ }
+
public boolean hasSink() { return sink_ != null; }
public DataSink getSink() { return sink_; }
public void setSink(DataSink sink) {
@@ -290,4 +353,33 @@ public class PlanFragment {
planRoot_ = newRoot;
planRoot_.setFragment(this);
}
+
+ /**
+ * Verify that the tree of PlanFragments and their contained tree of
+ * PlanNodes is constructed correctly.
+ */
+ public void verifyTree() {
+ // PlanNode.fragment_ is set correctly
+ List<PlanNode> nodes = Lists.newArrayList();
+ collectPlanNodes(nodes);
+ List<PlanNode> exchNodes = Lists.newArrayList();
+ for (PlanNode node: nodes) {
+ if (node instanceof ExchangeNode) exchNodes.add(node);
+ Preconditions.checkState(node.getFragment() == this);
+ }
+
+ // all ExchangeNodes have registered input fragments
+ Preconditions.checkState(exchNodes.size() == getChildren().size());
+ List<PlanFragment> childFragments = Lists.newArrayList();
+ for (PlanNode exchNode: exchNodes) {
+ PlanFragment childFragment = exchNode.getChild(0).getFragment();
+ Preconditions.checkState(!childFragments.contains(childFragment));
+ childFragments.add(childFragment);
+ Preconditions.checkState(childFragment.getDestNode() == exchNode);
+ }
+ // all registered children are accounted for
+ Preconditions.checkState(getChildren().containsAll(childFragments));
+
+ for (PlanFragment child: getChildren()) child.verifyTree();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanId.java b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
new file mode 100644
index 0000000..1c7213a
--- /dev/null
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanId.java
@@ -0,0 +1,39 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.cloudera.impala.planner;
+
+import com.cloudera.impala.common.Id;
+import com.cloudera.impala.common.IdGenerator;
+
+public class PlanId extends Id<PlanId> {
+ // Construction only allowed via an IdGenerator.
+ protected PlanId(int id) {
+ super(id);
+ }
+
+ public static IdGenerator<PlanId> createGenerator() {
+ return new IdGenerator<PlanId>() {
+ @Override
+ public PlanId getNextId() { return new PlanId(nextId_++); }
+ @Override
+ public PlanId getMaxId() { return new PlanId(nextId_ - 1); }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%02d", id_);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
index c393b4f..895df78 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java
@@ -314,9 +314,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
String childHeadlinePrefix = prefix + "|--";
String childDetailPrefix = prefix + "| ";
for (int i = children_.size() - 1; i >= 1; --i) {
- expBuilder.append(
- children_.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
- detailLevel));
+ PlanNode child = getChild(i);
+ if (fragment_ != child.fragment_) {
+ // we're crossing a fragment boundary
+ expBuilder.append(
+ child.fragment_.getExplainString(
+ childHeadlinePrefix, childDetailPrefix, detailLevel));
+ } else {
+ expBuilder.append(
+ child.getExplainString(childHeadlinePrefix, childDetailPrefix,
+ detailLevel));
+ }
if (printFiller) expBuilder.append(filler + "\n");
}
expBuilder.append(children_.get(0).getExplainString(prefix, prefix, detailLevel));
@@ -327,7 +335,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
/**
* Return the node-specific details.
* Subclass should override this function.
- * Each line should be prefix by detailPrefix.
+ * Each line should be prefixed by detailPrefix.
*/
protected String getNodeExplainString(String rootPrefix, String detailPrefix,
TExplainLevel detailLevel) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
index cc6f923..f639c59 100644
--- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java
+++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java
@@ -96,6 +96,7 @@ public class Planner {
}
PlanFragment rootFragment = fragments.get(fragments.size() - 1);
+ rootFragment.verifyTree();
ExprSubstitutionMap rootNodeSmap = rootFragment.getPlanRoot().getOutputSmap();
List<Expr> resultExprs = null;
if (ctx_.isInsertOrCtas()) {
@@ -161,6 +162,19 @@ public class Planner {
}
/**
+ * Return a list of plans, each represented by the root of their fragment trees.
+ * TODO: roll into createPlan()
+ */
+ public List<PlanFragment> createParallelPlans() throws ImpalaException {
+ ArrayList<PlanFragment> distrPlan = createPlan();
+ Preconditions.checkNotNull(distrPlan);
+ ParallelPlanner planner = new ParallelPlanner(ctx_);
+ List<PlanFragment> parallelPlans = planner.createPlans(distrPlan.get(0));
+ ctx_.getRootAnalyzer().getTimeline().markEvent("Parallel plans created");
+ return parallelPlans;
+ }
+
+ /**
* Return combined explain string for all plan fragments.
* Includes the estimated resource requirements from the request if set.
*/
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/main/java/com/cloudera/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
index 4d21895..fc1f2fc 100644
--- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java
@@ -118,6 +118,7 @@ import com.cloudera.impala.thrift.TLoadDataReq;
import com.cloudera.impala.thrift.TLoadDataResp;
import com.cloudera.impala.thrift.TMetadataOpRequest;
import com.cloudera.impala.thrift.TPlanFragment;
+import com.cloudera.impala.thrift.TPlanFragmentTree;
import com.cloudera.impala.thrift.TQueryCtx;
import com.cloudera.impala.thrift.TQueryExecRequest;
import com.cloudera.impala.thrift.TResetMetadataRequest;
@@ -947,6 +948,23 @@ public class Frontend {
// create plan
LOG.debug("create plan");
Planner planner = new Planner(analysisResult, queryCtx);
+ if (RuntimeEnv.INSTANCE.isTestEnv()
+ && queryCtx.request.query_options.mt_num_cores != 1) {
+ // TODO: this is just to be able to run tests; implement this
+ List<PlanFragment> planRoots = planner.createParallelPlans();
+ for (PlanFragment planRoot: planRoots) {
+ TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
+ queryExecRequest.addToMt_plans(thriftPlan);
+ }
+ queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
+ queryExecRequest.setQuery_ctx(queryCtx);
+ explainString.append(planner.getExplainString(
+ Lists.newArrayList(planRoots.get(0)), queryExecRequest,
+ TExplainLevel.STANDARD));
+ queryExecRequest.setQuery_plan(explainString.toString());
+ result.setQuery_exec_request(queryExecRequest);
+ return result;
+ }
ArrayList<PlanFragment> fragments = planner.createPlan();
List<ScanNode> scanNodes = Lists.newArrayList();
@@ -954,11 +972,11 @@ public class Frontend {
// queryExecRequest.dest_fragment_idx
Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
- for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
- PlanFragment fragment = fragments.get(fragmentId);
+ for (int idx = 0; idx < fragments.size(); ++idx) {
+ PlanFragment fragment = fragments.get(idx);
Preconditions.checkNotNull(fragment.getPlanRoot());
fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
- fragmentIdx.put(fragment, fragmentId);
+ fragmentIdx.put(fragment, idx);
}
// set fragment destinations
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
index 7abd81a..ce8980e 100644
--- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java
@@ -375,10 +375,9 @@ public class PlannerTestBase {
* locations to actualScanRangeLocations; compares both to the appropriate sections
* of 'testCase'.
*/
- private void RunTestCase(TestCase testCase, StringBuilder errorLog,
+ private void runTestCase(TestCase testCase, StringBuilder errorLog,
StringBuilder actualOutput, String dbName, TQueryOptions options)
throws CatalogException {
-
if (options == null) {
options = defaultQueryOptions();
} else {
@@ -396,11 +395,13 @@ public class PlannerTestBase {
queryCtx.request.query_options = options;
// Test single node plan, scan range locations, and column lineage.
TExecRequest singleNodeExecRequest =
- testPlan(testCase, true, queryCtx, errorLog, actualOutput);
+ testPlan(testCase, Section.PLAN, queryCtx, errorLog, actualOutput);
checkScanRangeLocations(testCase, singleNodeExecRequest, errorLog, actualOutput);
checkColumnLineage(testCase, singleNodeExecRequest, errorLog, actualOutput);
// Test distributed plan.
- testPlan(testCase, false, queryCtx, errorLog, actualOutput);
+ testPlan(testCase, Section.DISTRIBUTEDPLAN, queryCtx, errorLog, actualOutput);
+ // test parallel plans
+ testPlan(testCase, Section.PARALLELPLANS, queryCtx, errorLog, actualOutput);
}
/**
@@ -410,17 +411,20 @@ public class PlannerTestBase {
* Returns the produced exec request or null if there was an error generating
* the plan.
*/
- private TExecRequest testPlan(TestCase testCase, boolean singleNodePlan,
+ private TExecRequest testPlan(TestCase testCase, Section section,
TQueryCtx queryCtx, StringBuilder errorLog, StringBuilder actualOutput) {
- Section section = (singleNodePlan) ? Section.PLAN : Section.DISTRIBUTEDPLAN;
String query = testCase.getQuery();
queryCtx.request.setStmt(query);
if (section == Section.PLAN) {
queryCtx.request.getQuery_options().setNum_nodes(1);
} else {
+ // for distributed and parallel execution we want to run on all available nodes
queryCtx.request.getQuery_options().setNum_nodes(
ImpalaInternalServiceConstants.NUM_NODES_ALL);
}
+ if (section == Section.PARALLELPLANS) {
+ queryCtx.request.query_options.mt_num_cores = 2;
+ }
ArrayList<String> expectedPlan = testCase.getSectionContents(section);
boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();
String expectedErrorMsg = getExpectedErrorMessage(expectedPlan);
@@ -614,7 +618,7 @@ public class PlannerTestBase {
actualOutput.append(testCase.getSectionAsString(Section.QUERY, true, "\n"));
actualOutput.append("\n");
try {
- RunTestCase(testCase, errorLog, actualOutput, dbName, options);
+ runTestCase(testCase, errorLog, actualOutput, dbName, options);
} catch (CatalogException e) {
errorLog.append(String.format("Failed to plan query\n%s\n%s",
testCase.getQuery(), e.getMessage()));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b7d5b7c/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
index cac975a..eb1ca6d 100644
--- a/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
+++ b/fe/src/test/java/com/cloudera/impala/testutil/TestFileParser.java
@@ -61,6 +61,7 @@ public class TestFileParser {
RESULTS,
PLAN,
DISTRIBUTEDPLAN,
+ PARALLELPLANS,
FILEERRORS,
PARTITIONS,
SETUP,
@@ -177,10 +178,14 @@ public class TestFileParser {
* Returns false if the current test case is invalid due to missing sections or query
*/
public boolean isValid() {
- return !getQuery().isEmpty() && (!getSectionContents(Section.PLAN).isEmpty() ||
- !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty() ||
- !getSectionContents(Section.LINEAGE).isEmpty());
+ return !getQuery().isEmpty()
+ && (!getSectionContents(Section.PLAN).isEmpty()
+ || !getSectionContents(Section.DISTRIBUTEDPLAN).isEmpty()
+ || !getSectionContents(Section.PARALLELPLANS).isEmpty()
+ || !getSectionContents(Section.LINEAGE).isEmpty());
}
+
+ public boolean isEmpty() { return expectedResultSections.isEmpty(); }
}
private final List<TestCase> testCases = Lists.newArrayList();
@@ -243,7 +248,7 @@ public class TestFileParser {
++lineNum;
if (line.startsWith("====") && sectionCount > 0) {
currentTestCase.addSection(currentSection, sectionContents);
- if(!currentTestCase.isValid()) {
+ if (!currentTestCase.isValid()) {
throw new IllegalStateException("Invalid test case" +
" at line " + currentTestCase.startLineNum + " detected.");
}
@@ -288,7 +293,7 @@ public class TestFileParser {
}
}
- if(!currentTestCase.isValid()) {
+ if (!currentTestCase.isEmpty() && !currentTestCase.isValid()) {
throw new IllegalStateException("Invalid test case" +
" at line " + currentTestCase.startLineNum + " detected.");
}
@@ -308,7 +313,8 @@ public class TestFileParser {
open(table);
testCases.clear();
while (scanner.hasNextLine()) {
- testCases.add(parseOneTestCase());
+ TestCase testCase = parseOneTestCase();
+ if (!testCase.isEmpty()) testCases.add(testCase);
}
} finally {
close();