You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2016/03/04 08:12:48 UTC
tajo git commit: TAJO-2082: Aggregation on a derived table which
includes union can cause incorrect result.
Repository: tajo
Updated Branches:
refs/heads/master fdb76ed2c -> 7b0af7448
TAJO-2082: Aggregation on a derived table which includes union can cause incorrect result.
Closes #969
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7b0af744
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7b0af744
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7b0af744
Branch: refs/heads/master
Commit: 7b0af74483521615f302d2a3376556dad325297f
Parents: fdb76ed
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Mar 4 16:12:16 2016 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Mar 4 16:12:16 2016 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../TooLargeInputForCrossJoinException.java | 2 +-
.../apache/tajo/util/graph/DirectedGraph.java | 3 +-
.../tajo/util/graph/DirectedGraphVisitor.java | 4 +-
.../tajo/util/graph/SimpleDirectedGraph.java | 13 +-
.../util/graph/TestSimpleDirectedGraph.java | 3 +-
.../tajo/engine/planner/global/DataChannel.java | 8 +
.../engine/planner/global/ExecutionBlock.java | 161 +++++++++-----
.../engine/planner/global/GlobalPlanner.java | 21 +-
.../tajo/engine/planner/global/MasterPlan.java | 51 ++++-
.../global/builder/DistinctGroupbyBuilder.java | 3 +-
.../rewriter/rules/BroadcastJoinRule.java | 9 +-
.../planner/physical/ExternalSortExec.java | 2 +-
.../apache/tajo/querymaster/Repartitioner.java | 29 ++-
.../java/org/apache/tajo/querymaster/Stage.java | 222 +++++++++++--------
.../plan/rewrite/SelfDescSchemaBuildPhase.java | 2 +-
.../org/apache/tajo/plan/util/PlannerUtil.java | 2 +-
17 files changed, 343 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 7892aa2..592bfe3 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,9 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2082: Aggregation on a derived table which includes union can cause
+ incorrect result. (jihoon)
+
TAJO-2081: Incorrect task locality on single node. (jinho)
TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
index 55d5f46..d958cc6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TooLargeInputForCrossJoinException.java
@@ -33,6 +33,6 @@ public class TooLargeInputForCrossJoinException extends TajoException {
}
public TooLargeInputForCrossJoinException(String[] relations, long currentBroadcastThreshold) {
- super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " MB");
+ super(ResultCode.TOO_LARGE_INPUT_FOR_CROSS_JOIN, StringUtils.join(relations), currentBroadcastThreshold + " KB");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
index d8d5ced..ae61ed8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraph.java
@@ -19,6 +19,7 @@
package org.apache.tajo.util.graph;
import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.exception.TajoException;
import java.util.List;
@@ -60,5 +61,5 @@ public interface DirectedGraph<V, E> extends Graph<V, E> {
/**
* It visits all vertices in a post-order traverse way.
*/
- <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor);
+ <CONTEXT> void accept(CONTEXT context, V src, DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
index 8e0ce87..86f1856 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/DirectedGraphVisitor.java
@@ -18,8 +18,10 @@
package org.apache.tajo.util.graph;
+import org.apache.tajo.exception.TajoException;
+
import java.util.Stack;
public interface DirectedGraphVisitor<CONTEXT, V> {
- void visit(CONTEXT context, Stack<V> stack, V v);
+ void visit(CONTEXT context, Stack<V> stack, V v) throws TajoException;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
index b5e36e7..e1ba137 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/graph/SimpleDirectedGraph.java
@@ -21,6 +21,8 @@ package org.apache.tajo.util.graph;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.util.TUtil;
import java.util.*;
@@ -219,13 +221,14 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
}
@Override
- public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor) {
+ public <CONTEXT> void accept(CONTEXT context, V source, DirectedGraphVisitor<CONTEXT, V> visitor)
+ throws TajoException {
Stack<V> stack = new Stack<>();
visitRecursive(context, stack, source, visitor);
}
private <CONTEXT> void visitRecursive(CONTEXT context, Stack<V> stack, V current,
- DirectedGraphVisitor<CONTEXT, V> visitor) {
+ DirectedGraphVisitor<CONTEXT, V> visitor) throws TajoException {
stack.push(current);
for (V child : getChilds(current)) {
visitRecursive(context, stack, child, visitor);
@@ -249,7 +252,11 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
public String toStringGraph(V vertex) {
StringBuilder sb = new StringBuilder();
QueryGraphTopologyStringBuilder visitor = new QueryGraphTopologyStringBuilder();
- accept(null, vertex, visitor);
+ try {
+ accept(null, vertex, visitor);
+ } catch (TajoException e) {
+ throw new TajoRuntimeException(e);
+ }
Stack<DepthString> depthStrings = visitor.getDepthStrings();
while(!depthStrings.isEmpty()) {
sb.append(printDepthString(depthStrings.pop()));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
index 45cde2a..9ebb69b 100644
--- a/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
+++ b/tajo-common/src/test/java/org/apache/tajo/util/graph/TestSimpleDirectedGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.util.graph;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.util.graph.DirectedGraphVisitor;
import org.apache.tajo.util.graph.SimpleDirectedGraph;
import org.junit.Test;
@@ -34,7 +35,7 @@ public class TestSimpleDirectedGraph {
private static final Log LOG = LogFactory.getLog(TestSimpleDirectedGraph.class);
@Test
- public final void test() {
+ public final void test() throws TajoException {
SimpleDirectedGraph<String, Integer> graph = new SimpleDirectedGraph<>();
// root
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index c779d2f..10e9973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -99,6 +99,14 @@ public class DataChannel {
return shuffleType;
}
+ public boolean isHashShuffle() {
+ return shuffleType == ShuffleType.HASH_SHUFFLE || shuffleType == ShuffleType.SCATTERED_HASH_SHUFFLE;
+ }
+
+ public boolean isRangeShuffle() {
+ return shuffleType == ShuffleType.RANGE_SHUFFLE;
+ }
+
public boolean needShuffle() {
return shuffleType != ShuffleType.NONE_SHUFFLE;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 573f5aa..fde05c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -16,7 +16,10 @@ package org.apache.tajo.engine.planner.global;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
import java.util.*;
@@ -30,19 +33,14 @@ import java.util.*;
public class ExecutionBlock {
private ExecutionBlockId executionBlockId;
private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = new ArrayList<>();
private Enforcer enforcer = new Enforcer();
-
// Actual ScanNode's ExecutionBlockId -> Delegated ScanNode's ExecutionBlockId.
private Map<ExecutionBlockId, ExecutionBlockId> unionScanMap = new HashMap<>();
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
- private boolean isUnionOnly;
-
private Map<String, ScanNode> broadcastRelations = new HashMap<>();
+ private PlanContext planContext;
+
/*
* An execution block is null-supplying or preserved-row when its output is used as an input for outer join.
* These properties are decided based on the type of parent execution block's outer join.
@@ -98,52 +96,16 @@ public class ExecutionBlock {
return executionBlockId;
}
- public void setPlan(LogicalNode plan) {
- hasJoinPlan = false;
- hasUnionPlan = false;
- isUnionOnly = true;
- this.scanlist.clear();
+ public void setPlan(LogicalNode plan) throws TajoException {
this.plan = plan;
if (plan == null) {
return;
}
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- // TODO: the below code should be improved to handle every case
- if (isUnionOnly && node.getType() != NodeType.ROOT && node.getType() != NodeType.TABLE_SUBQUERY &&
- node.getType() != NodeType.SCAN && node.getType() != NodeType.PARTITIONS_SCAN &&
- node.getType() != NodeType.UNION && node.getType() != NodeType.PROJECTION) {
- isUnionOnly = false;
- }
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- if (binary.getType() == NodeType.JOIN) {
- hasJoinPlan = true;
- } else if (binary.getType() == NodeType.UNION) {
- hasUnionPlan = true;
- }
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scanlist.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- TableSubQueryNode subQuery = (TableSubQueryNode) node;
- s.add(s.size(), subQuery.getSubQuery());
- } else if (node instanceof StoreTableNode) {
- store = (StoreTableNode)node;
- }
- }
- if (!hasUnionPlan) {
- isUnionOnly = false;
- }
+ final PlanVisitor visitor = new PlanVisitor();
+ planContext = new PlanContext();
+ visitor.visit(planContext, null, null, plan, new Stack<>());
}
public void addUnionScan(ExecutionBlockId realScanEbId, ExecutionBlockId delegatedScanEbId) {
@@ -163,12 +125,12 @@ public class ExecutionBlock {
}
public StoreTableNode getStoreTableNode() {
- return store;
+ return planContext.store;
}
public int getNonBroadcastRelNum() {
int nonBroadcastRelNum = 0;
- for (ScanNode scanNode : scanlist) {
+ for (ScanNode scanNode : planContext.scanlist) {
if (!broadcastRelations.containsKey(scanNode.getCanonicalName())) {
nonBroadcastRelNum++;
}
@@ -177,19 +139,23 @@ public class ExecutionBlock {
}
public ScanNode [] getScanNodes() {
- return this.scanlist.toArray(new ScanNode[scanlist.size()]);
+ return planContext.scanlist.toArray(new ScanNode[planContext.scanlist.size()]);
}
public boolean hasJoin() {
- return hasJoinPlan;
+ return planContext.hasJoinPlan;
}
public boolean hasUnion() {
- return hasUnionPlan;
+ return planContext.hasUnionPlan;
+ }
+
+ public boolean hasAgg() {
+ return planContext.hasAggPlan;
}
public boolean isUnionOnly() {
- return isUnionOnly;
+ return planContext.isUnionOnly();
}
public void addBroadcastRelation(ScanNode relationNode) {
@@ -235,4 +201,93 @@ public class ExecutionBlock {
public boolean isPreservedRow() {
return preservedRow;
}
+
+ private class PlanContext {
+ StoreTableNode store = null;
+
+ List<ScanNode> scanlist = new ArrayList<>();
+
+ boolean hasJoinPlan = false;
+ boolean hasUnionPlan = false;
+ boolean hasAggPlan = false;
+ boolean hasSortPlan = false;
+
+ boolean isUnionOnly() {
+ return hasUnionPlan && !hasJoinPlan && !hasAggPlan && !hasSortPlan;
+ }
+ }
+
+ private class PlanVisitor extends BasicLogicalPlanVisitor<PlanContext, LogicalNode> {
+
+ @Override
+ public LogicalNode visitJoin(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.hasJoinPlan = true;
+ return super.visitJoin(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitGroupBy(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.hasAggPlan = true;
+ return super.visitGroupBy(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitWindowAgg(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.hasAggPlan = true;
+ return super.visitWindowAgg(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitDistinctGroupby(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+ DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException {
+ context.hasAggPlan = true;
+ return super.visitDistinctGroupby(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitSort(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.hasSortPlan = true;
+ return super.visitSort(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitUnion(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.hasUnionPlan = true;
+ return super.visitUnion(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitStoreTable(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.store = node;
+ return super.visitStoreTable(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.scanlist.add(node);
+ return super.visitScan(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitPartitionedTableScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+ PartitionedTableScanNode node, Stack<LogicalNode> stack)
+ throws TajoException {
+ context.scanlist.add(node);
+ return super.visitPartitionedTableScan(context, plan, block, node, stack);
+ }
+
+ @Override
+ public LogicalNode visitIndexScan(PlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node,
+ Stack<LogicalNode> stack) throws TajoException {
+ context.scanlist.add(node);
+ return super.visitIndexScan(context, plan, block, node, stack);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index bf41d5b..463d015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -199,7 +199,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
- ExecutionBlock leftBlock, ExecutionBlock rightBlock) {
+ ExecutionBlock leftBlock, ExecutionBlock rightBlock) throws TajoException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
@@ -596,7 +596,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
- GroupbyNode groupbyNode) {
+ GroupbyNode groupbyNode) throws TajoException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
@@ -675,7 +675,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
- GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+ GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) throws TajoException {
DataChannel lastDataChannel = null;
// It pushes down the first phase group-by operator into all child blocks.
@@ -715,7 +715,7 @@ public class GlobalPlanner {
}
private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
- GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+ GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) throws TajoException {
ExecutionBlock childBlock = latestBlock;
childBlock.setPlan(firstPhaseGroupby);
@@ -780,7 +780,7 @@ public class GlobalPlanner {
return firstPhaseGroupBy;
}
- private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
+ private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) throws TajoException {
MasterPlan masterPlan = context.plan;
ExecutionBlock currentBlock;
@@ -860,7 +860,7 @@ public class GlobalPlanner {
*/
private ExecutionBlock buildShuffleAndStorePlanNoPartitionedTableWithUnion(GlobalPlanContext context,
StoreTableNode currentNode,
- ExecutionBlock childBlock) {
+ ExecutionBlock childBlock) throws TajoException {
for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) {
StoreTableNode copy = PlannerUtil.clone(context.plan.getLogicalPlan(), currentNode);
copy.setChild(grandChildBlock.getPlan());
@@ -875,7 +875,7 @@ public class GlobalPlanner {
*/
private ExecutionBlock buildShuffleAndStorePlanToPartitionedTableWithUnion(GlobalPlanContext context,
StoreTableNode currentNode,
- ExecutionBlock lastBlock) {
+ ExecutionBlock lastBlock) throws TajoException {
MasterPlan masterPlan = context.plan;
DataChannel lastChannel = null;
@@ -899,7 +899,7 @@ public class GlobalPlanner {
*/
private ExecutionBlock buildShuffleAndStorePlanToPartitionedTable(GlobalPlanContext context,
StoreTableNode currentNode,
- ExecutionBlock lastBlock) {
+ ExecutionBlock lastBlock) throws TajoException {
MasterPlan masterPlan = context.plan;
ExecutionBlock nextBlock = masterPlan.newExecutionBlock();
@@ -920,7 +920,7 @@ public class GlobalPlanner {
private ExecutionBlock buildNoPartitionedStorePlan(GlobalPlanContext context,
StoreTableNode currentNode,
- ExecutionBlock childBlock) {
+ ExecutionBlock childBlock) throws TajoException {
if (hasUnionChild(currentNode)) { // when the below is union
return buildShuffleAndStorePlanNoPartitionedTableWithUnion(context, currentNode, childBlock);
} else {
@@ -1240,7 +1240,8 @@ public class GlobalPlanner {
return node;
}
- private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) {
+ private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node)
+ throws TajoException {
ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
execBlock.setPlan(node);
context.execBlockMap.put(node.getPID(), execBlock);
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index feaba76..a7b03e7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -23,8 +23,10 @@ package org.apache.tajo.engine.planner.global;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.NotNull;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
@@ -47,6 +49,44 @@ public class MasterPlan {
private SimpleDirectedGraph<ExecutionBlockId, DataChannel> execBlockGraph =
new SimpleDirectedGraph<>();
+ private Map<ExecutionBlockId, ShuffleContext> shuffleInfo = new HashMap<>();
+
+ /**
+ *
+ */
+ public class ShuffleContext {
+ ExecutionBlockId parentEbId;
+ int partitionNum;
+
+ public ShuffleContext(ExecutionBlockId parentEbId, int partitionNum) {
+ this.parentEbId = parentEbId;
+ this.partitionNum = partitionNum;
+ }
+
+ public ExecutionBlockId getParentEbId() {
+ return parentEbId;
+ }
+
+ public int getPartitionNum() {
+ return partitionNum;
+ }
+ }
+
+ /**
+ *
+ * @param ebId
+ * @param partitionNum
+ */
+ public void addShuffleInfo(ExecutionBlockId ebId, int partitionNum) {
+ ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId();
+ shuffleInfo.put(parentId, new ShuffleContext(ebId, partitionNum));
+ }
+
+ public Optional<ShuffleContext> getShuffleInfo(ExecutionBlockId ebId) {
+ ExecutionBlockId parentId = getParent(getExecBlock(ebId)).getId();
+ return shuffleInfo.containsKey(parentId) ? Optional.of(shuffleInfo.get(parentId)) : Optional.empty();
+ }
+
public ExecutionBlockId newExecutionBlockId() {
return new ExecutionBlockId(queryId, nextId.incrementAndGet());
}
@@ -215,14 +255,14 @@ public class MasterPlan {
return getChild(executionBlock.getId(), idx);
}
- public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT, ExecutionBlockId> visitor) {
+ public <CONTEXT> void accept(CONTEXT context, ExecutionBlockId v, DirectedGraphVisitor<CONTEXT,
+ ExecutionBlockId> visitor) throws TajoException {
execBlockGraph.accept(context, v, visitor);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- ExecutionBlockCursor cursor = new ExecutionBlockCursor(this);
sb.append("-------------------------------------------------------------------------------\n");
sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n");
sb.append("-------------------------------------------------------------------------------\n");
@@ -285,12 +325,7 @@ public class MasterPlan {
sb.append("\n[Enforcers]\n");
int i = 0;
List<EnforceProperty> enforceProperties = block.getEnforcer().getProperties();
- Collections.sort(enforceProperties, new Comparator<EnforceProperty>() {
- @Override
- public int compare(EnforceProperty o1, EnforceProperty o2) {
- return o1.toString().compareTo(o2.toString());
- }
- });
+ Collections.sort(enforceProperties, (e1, e2) -> e1.toString().compareTo(e2.toString()));
for (EnforceProperty enforce : enforceProperties) {
sb.append(" ").append(i++).append(": ");
sb.append(Enforcer.toString(enforce));
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 592ea2b..8f7673b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.GlobalPlanner.GlobalPlanContext;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
@@ -711,7 +712,7 @@ public class DistinctGroupbyBuilder {
private ExecutionBlock buildDistinctGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
DistinctGroupbyNode firstPhaseGroupBy,
- DistinctGroupbyNode secondPhaseGroupBy) {
+ DistinctGroupbyNode secondPhaseGroupBy) throws TajoException {
DataChannel lastDataChannel = null;
// It pushes down the first phase group-by operator into all child blocks.
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
index a19704b..d390740 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java
@@ -175,7 +175,8 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
}
@Override
- public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId) {
+ public void visit(Context context, Stack<ExecutionBlockId> stack, ExecutionBlockId executionBlockId)
+ throws TajoException {
ExecutionBlock current = plan.getExecBlock(executionBlockId);
if (plan.isLeaf(current)) {
@@ -209,7 +210,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
*
* @param current
*/
- private void visitNonLeafNode(Context context, ExecutionBlock current) {
+ private void visitNonLeafNode(Context context, ExecutionBlock current) throws TajoException {
// At non-leaf execution blocks, merge broadcastable children's plan with the current plan.
if (!plan.isTerminal(current)) {
@@ -423,7 +424,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
* @param parent parent block who has join nodes
* @return
*/
- private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) {
+ private ExecutionBlock mergeTwoPhaseJoinIfPossible(MasterPlan plan, ExecutionBlock child, ExecutionBlock parent) throws TajoException {
ScanNode scanForChild = findScanForChildEb(child, parent);
parentFinder.set(scanForChild);
@@ -446,7 +447,7 @@ public class BroadcastJoinRule implements GlobalPlanRewriteRule {
}
private void addUnionNodeIfNecessary(Map<ExecutionBlockId, ExecutionBlockId> unionScanMap, MasterPlan plan,
- ExecutionBlock child, ExecutionBlock current) {
+ ExecutionBlock child, ExecutionBlock current) throws TajoException {
if (unionScanMap != null) {
List<ExecutionBlockId> unionScans = new ArrayList<>();
ExecutionBlockId representativeId = null;
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 0e89928..ff629c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -85,7 +85,7 @@ public class ExternalSortExec extends SortExec {
/** the defaultFanout of external sort */
private final int defaultFanout;
/** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
- private int sortBufferBytesNum;
+ private final long sortBufferBytesNum;
/** the number of available cores */
private final int allocatedCoreNum;
/** If there are available multiple cores, it tries parallel merge. */
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index ec7ed2d..4e4251a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -50,7 +50,6 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.storage.*;
@@ -321,12 +320,12 @@ public class Repartitioner {
if (tbNameToInterm.containsKey(scanEbId)) {
tbNameToInterm.get(scanEbId).add(intermediateEntry);
} else {
- tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry)));
+ tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry));
}
} else {
Map<ExecutionBlockId, List<IntermediateEntry>> tbNameToInterm =
new HashMap<>();
- tbNameToInterm.put(scanEbId, new ArrayList<>(Arrays.asList(intermediateEntry)));
+ tbNameToInterm.put(scanEbId, Lists.newArrayList(intermediateEntry));
hashEntries.put(intermediateEntry.getPartId(), tbNameToInterm);
}
}
@@ -606,10 +605,9 @@ public class Repartitioner {
MasterPlan masterPlan, Stage stage, int maxNum)
throws IOException {
DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0);
- if (channel.getShuffleType() == HASH_SHUFFLE
- || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) {
+ if (channel.isHashShuffle()) {
scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
- } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+ } else if (channel.isRangeShuffle()) {
scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum);
} else {
throw new TajoInternalError("Cannot support partition type");
@@ -698,10 +696,8 @@ public class Repartitioner {
TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
if (LOG.isDebugEnabled()) {
- if (ranges != null) {
- for (TupleRange eachRange : ranges) {
- LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
- }
+ for (TupleRange eachRange : ranges) {
+ LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
}
}
}
@@ -999,7 +995,7 @@ public class Repartitioner {
int partId = eachInterm.getPartId();
List<IntermediateEntry> partitionInterms = partitionIntermMap.get(partId);
if (partitionInterms == null) {
- partitionInterms = Arrays.asList(eachInterm);
+ partitionInterms = Lists.newArrayList(eachInterm);
partitionIntermMap.put(partId, partitionInterms);
} else {
partitionInterms.add(eachInterm);
@@ -1078,7 +1074,7 @@ public class Repartitioner {
fetchListVolume = 0;
}
FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE,
- ebId, currentInterm.getPartId(), Arrays.asList(currentInterm));
+ ebId, currentInterm.getPartId(), Lists.newArrayList(currentInterm));
fetch.setOffset(eachSplit.getFirst());
fetch.setLength(eachSplit.getSecond());
fetchListForSingleTask.add(fetch.getProto());
@@ -1219,7 +1215,7 @@ public class Repartitioner {
if (hashed.containsKey(entry.getPartId())) {
hashed.get(entry.getPartId()).add(entry);
} else {
- hashed.put(entry.getPartId(), Arrays.asList(entry));
+ hashed.put(entry.getPartId(), Lists.newArrayList(entry));
}
}
@@ -1235,7 +1231,7 @@ public class Repartitioner {
if (hashed.containsKey(host)) {
hashed.get(host).add(entry);
} else {
- hashed.put(host, Arrays.asList(entry));
+ hashed.put(host, Lists.newArrayList(entry));
}
}
@@ -1258,12 +1254,12 @@ public class Repartitioner {
}
// set the partition number for group by and sort
- if (channel.getShuffleType() == HASH_SHUFFLE) {
+ if (channel.isHashShuffle()) {
if (execBlock.getPlan().getType() == NodeType.GROUP_BY ||
execBlock.getPlan().getType() == NodeType.DISTINCT_GROUP_BY) {
keys = channel.getShuffleKeys();
}
- } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
+ } else if (channel.isRangeShuffle()) {
if (execBlock.getPlan().getType() == NodeType.SORT) {
SortNode sort = (SortNode) execBlock.getPlan();
keys = new Column[sort.getSortKeys().length];
@@ -1278,6 +1274,7 @@ public class Repartitioner {
channel.setShuffleOutputNum(1);
} else {
channel.setShuffleKeys(keys);
+ // NOTE: desiredNum is not used in Sort anymore.
channel.setShuffleOutputNum(desiredNum);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index f1813c9..08ff184 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -42,14 +42,17 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.MasterPlan.ShuffleContext;
import org.apache.tajo.error.Errors.SerializedException;
import org.apache.tajo.exception.ErrorUtil;
import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.plan.serder.PlanProto.EnforceProperty;
import org.apache.tajo.plan.util.PlannerUtil;
@@ -855,6 +858,7 @@ public class Stage implements EventHandler<StageEvent> {
ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock());
DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId());
setShuffleIfNecessary(stage, channel);
+ // TODO: verify changed shuffle plan
initTaskScheduler(stage);
// execute pre-processing asyncronously
stage.getContext().getQueryMasterContext().getSingleEventExecutor()
@@ -920,8 +924,8 @@ public class Stage implements EventHandler<StageEvent> {
* methods and the number of partitions to a given Stage.
*/
private static void setShuffleIfNecessary(Stage stage, DataChannel channel) {
- if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) {
- int numTasks = calculateShuffleOutputNum(stage, channel);
+ if (channel.isHashShuffle()) {
+ int numTasks = calculateShuffleOutputNum(stage);
Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel);
}
}
@@ -933,122 +937,154 @@ public class Stage implements EventHandler<StageEvent> {
* @param stage
* @return
*/
- public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) {
+ public static int calculateShuffleOutputNum(Stage stage) {
MasterPlan masterPlan = stage.getMasterPlan();
- ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
- LogicalNode grpNode = null;
- if (parent != null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY);
- if (grpNode == null) {
- grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY);
+ // For test
+ if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
+ int partitionNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
+ LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum + " for test");
+ return partitionNum;
+ }
+
+ Optional<ShuffleContext> optional = masterPlan.getShuffleInfo(stage.getId());
+ if (optional.isPresent()) {
+ LOG.info("# of partitions is determined as " + optional.get().getPartitionNum() +
+ "to match with sibling eb's partition number");
+ return optional.get().getPartitionNum();
+
+ } else {
+ ExecutionBlock parent = masterPlan.getParent(stage.getBlock());
+ int partitionNum;
+
+ if (parent != null) {
+ // We assume this execution block the first stage of join if two or more tables are included in this block,
+ if (parent.hasJoin()) {
+ if (parent.getNonBroadcastRelNum() > 1) {
+ // repartition join
+ partitionNum = calculatePartitionNumForRepartitionJoin(parent, stage);
+ LOG.info(stage.getId() + ", The determined number of partitions for repartition join is " + partitionNum);
+ } else {
+ // broadcast join
+ // partition number is calculated using the volume of the large table
+ partitionNum = calculatePartitionNumDefault(parent, stage);
+ LOG.info(stage.getId() + ", The determined number of partitions for broadcast join is " + partitionNum);
+ }
+
+ } else {
+ // Is this stage the first step of group-by?
+ if (parent.hasAgg()) {
+ LogicalNode grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY,
+ NodeType.DISTINCT_GROUP_BY, NodeType.WINDOW_AGG);
+ if (grpNode == null) {
+ throw new TajoInternalError("Cannot find aggregation plan for " + stage.getId());
+ }
+
+ if (!hasGroupKeys(stage, grpNode)) {
+ LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
+ partitionNum = 1;
+ } else {
+ partitionNum = calculatePartitionNumForAgg(parent, stage);
+ LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + partitionNum);
+ }
+
+ } else {
+ // NOTE: the below code might be executed during sort, but the partition number is not used anymore for sort.
+ LOG.info("============>>>>> Unexpected Case! <<<<<================");
+ partitionNum = calculatePartitionNumDefault(parent, stage);
+ LOG.info(stage.getId() + ", The determined number of partitions is " + partitionNum);
+ }
+
+ }
+ } else {
+ // This case means that the parent eb does not exist even though data shuffle is required after the current eb.
+ throw new TajoInternalError("Cannot find parent execution block of " + stage.block.getId());
}
+
+ // Record the partition number for sibling execution blocks
+ masterPlan.addShuffleInfo(stage.getId(), partitionNum);
+ return partitionNum;
}
+ }
+
+ private static int calculatePartitionNumForRepartitionJoin(ExecutionBlock parent, Stage currentStage) {
+ List<ExecutionBlock> childs = currentStage.masterPlan.getChilds(parent);
- // We assume this execution block the first stage of join if two or more tables are included in this block,
- if (parent != null && (parent.getNonBroadcastRelNum()) >= 2) {
- List<ExecutionBlock> childs = masterPlan.getChilds(parent);
+ // for outer
+ ExecutionBlock outer = childs.get(0);
+ long outerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, outer);
- // for outer
- ExecutionBlock outer = childs.get(0);
- long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer);
+ // for inner
+ ExecutionBlock inner = childs.get(1);
+ long innerVolume = getInputVolume(currentStage.masterPlan, currentStage.context, inner);
+ LOG.info(currentStage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
+ + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
- // for inner
- ExecutionBlock inner = childs.get(1);
- long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner);
- LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, "
- + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB");
+ long bigger = Math.max(outerVolume, innerVolume);
- long bigger = Math.max(outerVolume, innerVolume);
+ int mb = (int) Math.ceil((double) bigger / 1048576);
+ LOG.info(currentStage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
- int mb = (int) Math.ceil((double) bigger / 1048576);
- LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB");
+ return (int) Math.ceil((double) mb /
+ currentStage.masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+ }
- int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE));
+ private static int calculatePartitionNumForAgg(ExecutionBlock parent, Stage stage) {
+ int volumeByMB = getInputVolumeMB(parent, stage);
+ LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
+ // determine the number of task
+ return (int) Math.ceil((double) volumeByMB /
+ stage.masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
- if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) {
- taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM);
- LOG.warn("!!!!! TESTCASE MODE !!!!!");
- }
+ }
- // The shuffle output numbers of join may be inconsistent by execution block order.
- // Thus, we need to compare the number with DataChannel output numbers.
- // If the number is right, the number and DataChannel output numbers will be consistent.
- int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0;
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) {
- outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum());
- }
- for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) {
- innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum());
+ private static boolean hasGroupKeys(Stage currentStage, LogicalNode aggNode) {
+ if (aggNode.getType() == NodeType.GROUP_BY) {
+ return ((GroupbyNode)aggNode).getGroupingColumns().length > 0;
+ } else if (aggNode.getType() == NodeType.DISTINCT_GROUP_BY) {
+ // Find current distinct stage node.
+ DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(currentStage.getBlock().getPlan(),
+ NodeType.DISTINCT_GROUP_BY);
+ if (distinctNode == null) {
+ LOG.warn(currentStage.getId() + ", Can't find current DistinctGroupbyNode");
+ distinctNode = (DistinctGroupbyNode)aggNode;
}
- if (outerShuffleOutputNum != innerShuffleOutputNum
- && taskNum != outerShuffleOutputNum
- && taskNum != innerShuffleOutputNum) {
- LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" +
- ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) +
- ", outerShuffleOutptNum=" + outerShuffleOutputNum +
- ", innerShuffleOutputNum=" + innerShuffleOutputNum);
- taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum);
- }
-
- LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum);
-
- return taskNum;
- // Is this stage the first step of group-by?
- } else if (grpNode != null) {
- boolean hasGroupColumns = true;
- if (grpNode.getType() == NodeType.GROUP_BY) {
- hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0;
- } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) {
- // Find current distinct stage node.
- DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY);
- if (distinctNode == null) {
- LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode");
- distinctNode = (DistinctGroupbyNode)grpNode;
- }
- hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
+ boolean hasGroupColumns = distinctNode.getGroupingColumns().length > 0;
- Enforcer enforcer = stage.getBlock().getEnforcer();
- if (enforcer == null) {
- LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null.");
- }
+ Enforcer enforcer = currentStage.getBlock().getEnforcer();
+ if (enforcer == null) {
+ LOG.warn(currentStage.getId() + ", DistinctGroupbyNode's enforcer is null.");
+ } else {
EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode);
if (property != null) {
if (property.getDistinct().getIsMultipleAggregation()) {
MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage();
- if (multiAggStage != MultipleAggregationStage.THRID_STAGE) {
- hasGroupColumns = true;
- }
+ hasGroupColumns = multiAggStage != MultipleAggregationStage.THRID_STAGE;
}
}
}
- if (!hasGroupColumns) {
- LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1");
- return 1;
- } else {
- long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
- int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB);
- LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB");
- // determine the number of task
- int taskNum = (int) Math.ceil((double) volumeByMB /
- masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE));
- LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum);
- return taskNum;
- }
+ return hasGroupColumns;
} else {
- LOG.info("============>>>>> Unexpected Case! <<<<<================");
- long volume = getInputVolume(stage.masterPlan, stage.context, stage.block);
-
- int mb = (int) Math.ceil((double)volume / 1048576);
- LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB");
- // determine the number of task per 128MB
- int taskNum = (int) Math.ceil((double)mb / 128);
- LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum);
- return taskNum;
+ return ((WindowAggNode) aggNode).hasPartitionKeys();
}
}
+ private static int calculatePartitionNumDefault(ExecutionBlock parent, Stage currentStage) {
+ int mb = getInputVolumeMB(parent, currentStage);
+ LOG.info(currentStage.getId() + ", Table's volume is approximately " + mb + " MB");
+ // determine the number of task per 128 MB
+ return (int) Math.ceil((double)mb / 128);
+ }
+
+ private static int getInputVolumeMB(ExecutionBlock parent, Stage currentStage) {
+ // NOTE: Get input volume from the parent EB.
+ // If the parent EB contains an UNION query, the volume of the whole input for the UNION is returned.
+ // Otherwise, only the input volume of the current EB is returned.
+ long volume = getInputVolume(currentStage.masterPlan, currentStage.context, parent);
+
+ return (int) Math.ceil((double)volume / StorageUnit.MB);
+ }
+
private static void schedule(Stage stage) throws IOException, TajoException {
MasterPlan masterPlan = stage.getMasterPlan();
ExecutionBlock execBlock = stage.getBlock();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
index 48affc5..59adfc5 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/SelfDescSchemaBuildPhase.java
@@ -398,7 +398,7 @@ public class SelfDescSchemaBuildPhase extends LogicalPlanPreprocessPhase {
* @param columns a set of columns
* @return schema build from columns
*/
- private Schema buildSchemaFromColumnSet(Set<Column> columns) {
+ private Schema buildSchemaFromColumnSet(Set<Column> columns) throws TajoException {
SchemaGraph schemaGraph = new SchemaGraph();
Set<ColumnVertex> rootVertexes = new HashSet<>();
Schema schema = new Schema();
http://git-wip-us.apache.org/repos/asf/tajo/blob/7b0af744/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 239becc..6897e17 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -414,7 +414,7 @@ public class PlannerUtil {
* @param type to find
* @return a found logical node
*/
- public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) {
+ public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType... type) {
Preconditions.checkNotNull(node);
Preconditions.checkNotNull(type);