You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/26 08:26:28 UTC
git commit: TAJO-205: Repartitioner occasionally chooses a partition
number as one. (hyunsik)
Updated Branches:
refs/heads/master 17287ef58 -> 9ca979223
TAJO-205: Repartitioner occasionally chooses a partition number as one. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9ca97922
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9ca97922
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9ca97922
Branch: refs/heads/master
Commit: 9ca97922386ef2954016efd54e94fd81c2f31f91
Parents: 17287ef
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Sep 26 15:26:06 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Sep 26 15:26:06 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/tajo/conf/TajoConf.java | 11 +-
.../apache/tajo/engine/planner/InsertNode.java | 2 +-
.../engine/planner/PhysicalPlannerImpl.java | 8 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 27 +-
.../tajo/engine/planner/global/MasterPlan.java | 4 +
.../engine/planner/physical/BNLJoinExec.java | 3 +-
.../planner/physical/ExternalSortExec.java | 8 +-
.../org/apache/tajo/master/GlobalPlanner.java | 387 +++++++++++--------
.../tajo/master/querymaster/Repartitioner.java | 22 +-
.../tajo/master/querymaster/SubQuery.java | 15 +-
.../src/main/proto/TajoWorkerProtocol.proto | 3 +-
.../src/main/resources/tajo-default.xml | 5 +
.../tajo/engine/query/TestTableSubQuery.java | 21 +-
.../src/test/resources/tajo-default.xml | 2 +-
15 files changed, 305 insertions(+), 216 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9400154..c6e88de 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -153,6 +153,9 @@ Release 0.2.0 - unreleased
BUG FIXES
+ TAJO-205: Repartitioner occasionally chooses a partition number as one.
+ (hyunsik)
+
TAJO-191: INSERT OVERWRITE INTO statement should follow the table meta in
catalog. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 855e096..f429a26 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -107,19 +107,20 @@ public class TajoConf extends YarnConfiguration {
//////////////////////////////////
// Physical Executors
//////////////////////////////////
- EXT_SORT_BUFFER("tajo.extsort.buffer", 400000),
+ EXTENAL_SORT_BUFFER_NUM("tajo.sort.external.buffer", 1000000),
BROADCAST_JOIN_THRESHOLD("tajo.join.broadcast.threshold", (long)5 * 1048576),
- HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)512 * 1048576),
+ INMEMORY_HASH_TABLE_DEFAULT_SIZE("tajo.join.inmemory.table.num", (long)1000000),
+ INMEMORY_HASH_JOIN_THRESHOLD("tajo.join.memhash, threshold", (long)256 * 1048576),
+ INMEMORY_HASH_AGGREGATION_THRESHOLD("tajo.aggregation.hash.threshold", (long)256 * 1048576),
//////////////////////////////////////////
// Distributed Query Execution Parameters
//////////////////////////////////////////
- SORT_BUFFER_SIZE("tajo.sort.mb", 128),
JOIN_TASK_VOLUME("tajo.join.task-volume.mb", 128),
- SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 256),
+ SORT_TASK_VOLUME("tajo.sort.task-volume.mb", 128),
AGGREGATION_TASK_VOLUME("tajo.task-aggregation.volume.mb", 128),
- JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 64),
+ JOIN_PARTITION_VOLUME("tajo.join.part-volume.mb", 128),
SORT_PARTITION_VOLUME("tajo.sort.part-volume.mb", 256),
AGGREGATION_PARTITION_VOLUME("tajo.aggregation.part-volume.mb", 256),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ae88630..4a3e479 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -160,7 +160,7 @@ public class InsertNode extends LogicalNode implements Cloneable {
sb.append("INTO");
if (hasTargetTable()) {
- sb.append(targetTableDesc);
+ sb.append(targetTableDesc.getName());
}
if (hasPath()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index ddedd80..c1a1b2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -87,8 +87,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
DataChannel channel = context.getDataChannel();
StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- storeTableNode.setInSchema(execPlan.getSchema());
- storeTableNode.setOutSchema(execPlan.getSchema());
+ storeTableNode.setInSchema(plan.getOutSchema());
+ storeTableNode.setOutSchema(plan.getOutSchema());
if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
} else {
@@ -281,7 +281,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
- final long threshold = 1048576 * 128; // 64MB
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_JOIN_THRESHOLD);
boolean hashJoin = false;
if (leftSize < threshold || rightSize < threshold) {
@@ -412,7 +412,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
long estimatedSize = estimateSizeRecursive(context, outerLineage);
- final long threshold = conf.getLongVar(TajoConf.ConfVars.HASH_AGGREGATION_THRESHOLD);
+ final long threshold = conf.getLongVar(TajoConf.ConfVars.INMEMORY_HASH_AGGREGATION_THRESHOLD);
// if the relation size is less than the threshold,
// the hash aggregation will be used.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 4907c40..60243bc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -452,16 +452,31 @@ public class PlannerUtil {
return new SortSpec[][] {outerSortSpec, innerSortSpec};
}
- public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
- SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, outer, inner);
+ public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+ SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema);
TupleComparator [] comparators = new TupleComparator[2];
- comparators[0] = new TupleComparator(outer, sortSpecs[0]);
- comparators[1] = new TupleComparator(inner, sortSpecs[1]);
+ comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
+ comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
return comparators;
}
- public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema outer, Schema inner) {
- JoinKeyPairFinder finder = new JoinKeyPairFinder(outer, inner);
+ /**
+ * @return the first array contains left table's columns, and the second array contains right table's columns.
+ */
+ public static Column [][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+ List<Column []> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema);
+ Column [] leftColumns = new Column[joinKeys.size()];
+ Column [] rightColumns = new Column[joinKeys.size()];
+ for (int i = 0; i < joinKeys.size(); i++) {
+ leftColumns[i] = joinKeys.get(i)[0];
+ rightColumns[i] = joinKeys.get(i)[1];
+ }
+
+ return new Column[][] {leftColumns, rightColumns};
+ }
+
+ public static List<Column []> getJoinKeyPairs(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+ JoinKeyPairFinder finder = new JoinKeyPairFinder(leftSchema, rightSchema);
joinQual.preOrder(finder);
return finder.getPairs();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 2f4001e..6d30d00 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -170,6 +170,10 @@ public class MasterPlan {
execBlockGraph.disconnect(src, target);
}
+ public ExecutionBlock getParent(ExecutionBlock executionBlock) {
+ return execBlockMap.get(execBlockGraph.getParent(executionBlock.getId()));
+ }
+
public List<ExecutionBlock> getChilds(ExecutionBlock execBlock) {
return getChilds(execBlock.getId());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index ba01b52..21f8749 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -60,8 +60,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
final PhysicalExec outer, PhysicalExec inner) {
- super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
- SchemaUtil.merge(outer.getSchema(), inner.getSchema()), outer, inner);
+ super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()), plan.getOutSchema(), outer, inner);
this.plan = plan;
this.joinQual = plan.getJoinQual();
if (joinQual != null) { // if join type is not 'cross join'
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 8fcd527..4742136 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -42,7 +42,7 @@ public class ExternalSortExec extends SortExec {
private final TableMeta meta;
private final Path sortTmpDir;
- private int SORT_BUFFER_SIZE;
+ private int MEM_TUPLE_NUM;
public ExternalSortExec(final TaskAttemptContext context,
final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
@@ -50,8 +50,8 @@ public class ExternalSortExec extends SortExec {
super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
this.plan = plan;
- this.SORT_BUFFER_SIZE = context.getConf().getIntVar(ConfVars.EXT_SORT_BUFFER);
- this.tupleSlots = new ArrayList<Tuple>(SORT_BUFFER_SIZE);
+ this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXTENAL_SORT_BUFFER_NUM);
+ this.tupleSlots = new ArrayList<Tuple>(MEM_TUPLE_NUM);
this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
this.localFS = FileSystem.getLocal(context.getConf());
@@ -96,7 +96,7 @@ public class ExternalSortExec extends SortExec {
Tuple tuple;
while ((tuple = child.next()) != null) { // partition sort start
tupleSlots.add(new VTuple(tuple));
- if (tupleSlots.size() == SORT_BUFFER_SIZE) {
+ if (tupleSlots.size() == MEM_TUPLE_NUM) {
sortAndStoreChunk(chunkId, tupleSlots);
chunkId++;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 620e813..37c77b7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.DataChannel;
+import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.*;
@@ -53,7 +54,6 @@ public class GlobalPlanner {
LogicalNode topmost;
LogicalNode lastRepartionableNode;
ExecutionBlock topMostLeftExecBlock;
- ExecutionBlock topMostRightExecBlock;
}
/**
@@ -104,6 +104,29 @@ public class GlobalPlanner {
LOG.info(masterPlan);
}
+ private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
+ LogicalNode childNode, ExecutionBlock lastChildBlock)
+ throws PlanningException {
+
+ ExecutionBlock currentBlock = null;
+ ExecutionBlock childBlock;
+ childBlock = lastChildBlock;
+
+ NodeType shuffleRequiredNodeType = lastDistNode.getType();
+ if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
+ ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
+ currentBlock = blocks[0];
+ } else if (shuffleRequiredNodeType == NodeType.SORT) {
+ ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
+ currentBlock = blocks[0];
+ } else if (shuffleRequiredNodeType == NodeType.JOIN) {
+ ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
+ currentBlock = blocks[0];
+ }
+
+ return currentBlock;
+ }
+
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -112,6 +135,193 @@ public class GlobalPlanner {
return new ScanNode(plan.newPID(), desc);
}
+ private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
+ ExecutionBlock parent, JoinNode join, boolean leftTable) {
+ ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
+
+ DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+ if (join.getJoinType() != JoinType.CROSS) {
+ Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
+ leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
+ if (leftTable) {
+ channel.setPartitionKey(joinColumns[0]);
+ } else {
+ channel.setPartitionKey(joinColumns[1]);
+ }
+ }
+ return channel;
+ }
+
+ private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
+ ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
+ throws PlanningException {
+ ExecutionBlock currentBlock;
+
+ JoinNode joinNode = (JoinNode) lastDistNode;
+ LogicalNode leftNode = joinNode.getLeftChild();
+ LogicalNode rightNode = joinNode.getRightChild();
+
+ ExecutionBlock leftBlock;
+ if (lastChildBlock == null) {
+ leftBlock = masterPlan.newExecutionBlock();
+ leftBlock.setPlan(leftNode);
+ } else {
+ leftBlock = lastChildBlock;
+ }
+ ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
+ rightBlock.setPlan(rightNode);
+
+ currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+ DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
+ ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+ ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
+
+ joinNode.setLeftChild(leftScan);
+ joinNode.setRightChild(rightScan);
+ currentBlock.setPlan(joinNode);
+
+ masterPlan.addConnect(leftChannel);
+ masterPlan.addConnect(rightChannel);
+
+ return new ExecutionBlock[] { currentBlock, childBlock };
+ }
+
+ private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+ LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
+ ExecutionBlock currentBlock = null;
+ GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
+
+ GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+ firstGroupBy.setHavingCondition(null);
+
+ if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+ ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+ UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+ ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+ UnionsFinderContext finderContext = new UnionsFinderContext();
+ finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+ currentBlock = masterPlan.newExecutionBlock();
+ GroupbyNode secondGroupBy = groupByNode;
+ for (UnionNode union : finderContext.unionList) {
+ TableSubQueryNode leftSubQuery = union.getLeftChild();
+ TableSubQueryNode rightSubQuery = union.getRightChild();
+ DataChannel dataChannel;
+ if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(leftSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
+ if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+ ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+ GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+ g1.setChild(rightSubQuery);
+ execBlock.setPlan(g1);
+ dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ secondGroupBy.setChild(scanNode);
+ masterPlan.addConnect(dataChannel);
+ }
+ }
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+ currentBlock.setPlan(currentNode);
+ } else {
+
+ if (childBlock == null) { // first repartition node
+ childBlock = masterPlan.newExecutionBlock();
+ }
+ childBlock.setPlan(firstGroupBy);
+
+ currentBlock = masterPlan.newExecutionBlock();
+
+ DataChannel channel;
+ if (firstGroupBy.isEmptyGrouping()) {
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+ } else {
+ channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+ }
+ channel.setSchema(firstGroupBy.getOutSchema());
+
+ GroupbyNode secondGroupBy = groupByNode;
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ secondGroupBy.setChild(scanNode);
+
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+
+ masterPlan.addConnect(channel);
+ currentBlock.setPlan(currentNode);
+ }
+
+ return new ExecutionBlock [] {currentBlock, childBlock};
+ }
+
+ private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
+ LogicalNode childNode, ExecutionBlock childBlock) {
+ ExecutionBlock currentBlock = null;
+
+ SortNode firstSort = (SortNode) lastDistNode;
+ if (childBlock == null) {
+ childBlock = masterPlan.newExecutionBlock();
+ }
+ childBlock.setPlan(firstSort);
+
+ currentBlock = masterPlan.newExecutionBlock();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+ channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
+ channel.setSchema(childNode.getOutSchema());
+
+ SortNode secondSort = PlannerUtil.clone(lastDistNode);
+ ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ secondSort.setChild(secondScan);
+
+ LimitNode limitAndSort;
+ LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
+ if (limitOrNull != null) {
+ limitAndSort = PlannerUtil.clone(limitOrNull);
+ limitAndSort.setChild(firstSort);
+
+ if (childBlock.getPlan().getType() == NodeType.SORT) {
+ childBlock.setPlan(limitAndSort);
+ } else {
+ LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
+ if (sortParent != null) {
+ if (sortParent instanceof UnaryNode) {
+ ((UnaryNode)sortParent).setChild(limitAndSort);
+ }
+ }
+ }
+ }
+
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondSort) {
+ ((UnaryNode)parent).setChild(secondSort);
+ }
+
+ masterPlan.addConnect(channel);
+ currentBlock.setPlan(currentNode);
+
+ return new ExecutionBlock[] { currentBlock, childBlock };
+ }
+
public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext> {
@Override
@@ -120,7 +330,7 @@ public class GlobalPlanner {
super.visitRoot(plan, node, stack, context);
if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
- context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
} else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
} else {
@@ -142,174 +352,13 @@ public class GlobalPlanner {
}
@Override
- public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext context) throws PlanningException {
+ public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext context)
+ throws PlanningException {
super.visitLimit(plan, node, stack, context);
context.topmost = node;
return node;
}
- private ExecutionBlock addChannel(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
- LogicalNode childNode, ExecutionBlock lastChildBlock) throws PlanningException {
- ExecutionBlock currentBlock = null;
- ExecutionBlock childBlock;
-
- childBlock = lastChildBlock;
-
- NodeType shuffleRequiredNodeType = lastDistNode.getType();
- if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
- GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
-
- GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
- firstGroupBy.setHavingCondition(null);
-
- if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
- ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
- UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
- ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
- UnionsFinderContext finderContext = new UnionsFinderContext();
- finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
-
- currentBlock = masterPlan.newExecutionBlock();
- GroupbyNode secondGroupBy = groupByNode;
- for (UnionNode union : finderContext.unionList) {
- TableSubQueryNode leftSubQuery = union.getLeftChild();
- TableSubQueryNode rightSubQuery = union.getRightChild();
- DataChannel dataChannel;
- if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(leftSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
- }
- if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
- ExecutionBlock execBlock = masterPlan.newExecutionBlock();
- GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
- g1.setChild(rightSubQuery);
- execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
- secondGroupBy.setChild(scanNode);
- masterPlan.addConnect(dataChannel);
- }
- }
- LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
- currentBlock.setPlan(curNode);
- } else {
-
- if (childBlock == null) { // first repartition node
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(firstGroupBy);
-
- currentBlock = masterPlan.newExecutionBlock();
-
- DataChannel channel;
- if (firstGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- } else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(firstGroupBy.getGroupingColumns());
- }
- channel.setSchema(firstGroupBy.getOutSchema());
-
- GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondGroupBy.setChild(scanNode);
-
- LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
-
- masterPlan.addConnect(channel);
- currentBlock.setPlan(curNode);
- }
- } else if (shuffleRequiredNodeType == NodeType.SORT) {
- SortNode firstSort = (SortNode) lastDistNode;
- if (childBlock == null) {
- childBlock = masterPlan.newExecutionBlock();
- }
- childBlock.setPlan(firstSort);
-
- currentBlock = masterPlan.newExecutionBlock();
- DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
- channel.setSchema(childNode.getOutSchema());
-
- SortNode secondSort = PlannerUtil.clone(lastDistNode);
- ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondSort.setChild(secondScan);
-
- LimitNode limitAndSort;
- LimitNode limitOrNull = PlannerUtil.findTopNode(curNode, NodeType.LIMIT);
- if (limitOrNull != null) {
- limitAndSort = PlannerUtil.clone(limitOrNull);
- limitAndSort.setChild(firstSort);
-
- if (childBlock.getPlan().getType() == NodeType.SORT) {
- childBlock.setPlan(limitAndSort);
- } else {
- LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
- if (sortParent != null) {
- if (sortParent instanceof UnaryNode) {
- ((UnaryNode)sortParent).setChild(limitAndSort);
- }
- }
- }
- }
-
- LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondSort) {
- ((UnaryNode)parent).setChild(secondSort);
- }
-
- masterPlan.addConnect(channel);
- currentBlock.setPlan(curNode);
- } else if (shuffleRequiredNodeType == NodeType.JOIN) {
- JoinNode joinNode = (JoinNode) lastDistNode;
- LogicalNode leftNode = joinNode.getLeftChild();
- LogicalNode rightNode = joinNode.getRightChild();
-
- ExecutionBlock leftBlock;
- if (lastChildBlock == null) {
- leftBlock = masterPlan.newExecutionBlock();
- leftBlock.setPlan(leftNode);
- } else {
- leftBlock = lastChildBlock;
- }
- ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
- rightBlock.setPlan(rightNode);
-
- currentBlock = masterPlan.newExecutionBlock();
-
- DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
- DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
- ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-
- joinNode.setLeftChild(leftScan);
- joinNode.setRightChild(rightScan);
- currentBlock.setPlan(joinNode);
-
- masterPlan.addConnect(leftChannel);
- masterPlan.addConnect(rightChannel);
- }
-
- return currentBlock;
- }
-
@Override
public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, GlobalPlanContext context)
throws PlanningException {
@@ -317,7 +366,7 @@ public class GlobalPlanner {
super.visitSort(plan, node, stack, context);
if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
context.topMostLeftExecBlock);
}
@@ -333,7 +382,7 @@ public class GlobalPlanner {
super.visitGroupBy(plan, node, stack, context);
if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
context.topMostLeftExecBlock);
}
@@ -356,7 +405,7 @@ public class GlobalPlanner {
super.visitJoin(plan, node, stack, context);
if (context.lastRepartionableNode != null) {
- context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
context.topMostLeftExecBlock);
}
@@ -372,7 +421,7 @@ public class GlobalPlanner {
super.visitUnion(plan, node, stack, context);
if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
- context.topMostLeftExecBlock = addChannel(context.plan, context.lastRepartionableNode, node, context.topmost,
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
context.topMostLeftExecBlock);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 78c9e3e..7911925 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -594,22 +594,18 @@ public class Repartitioner {
return hashed;
}
- public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int n, DataChannel channel) {
+ public static SubQuery setPartitionNumberForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
ExecutionBlock execBlock = subQuery.getBlock();
Column[] keys = null;
// if the next query is join,
// set the partition number for the current logicalUnit
// TODO: the union handling is required when a join has unions as its child
MasterPlan masterPlan = subQuery.getMasterPlan();
- ExecutionBlock parentBlock = execBlock.getParentBlock();
- if (parentBlock != null) {
- if (parentBlock.getStoreTableNode().getChild().getType() == NodeType.JOIN) {
- execBlock.getStoreTableNode().setPartitions(execBlock.getPartitionType(),
- execBlock.getStoreTableNode().getPartitionKeys(), n);
- keys = execBlock.getStoreTableNode().getPartitionKeys();
-
- masterPlan.getOutgoingChannels(subQuery.getId()).iterator().next()
- .setPartition(execBlock.getPartitionType(), execBlock.getStoreTableNode().getPartitionKeys(), n);
+ keys = channel.getPartitionKey();
+ if (!masterPlan.isRoot(subQuery.getBlock()) ) {
+ ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
+ if (parentBlock.getPlan().getType() == NodeType.JOIN) {
+ channel.setPartitionNum(desiredNum);
}
}
@@ -631,9 +627,11 @@ public class Repartitioner {
}
if (keys != null) {
if (keys.length == 0) {
- channel.setPartition(execBlock.getPartitionType(), new Column[]{}, 1);
+ channel.setPartitionKey(new Column[] {});
+ channel.setPartitionNum(1);
} else {
- channel.setPartition(execBlock.getPartitionType(), keys, n);
+ channel.setPartitionKey(keys);
+ channel.setPartitionNum(desiredNum);
}
}
return subQuery;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index a45d38c..254fd7c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -65,6 +65,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
/**
@@ -422,7 +423,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.finish();
state = SubQueryState.SUCCEEDED;
} else {
- DataChannel channel = subQuery.getMasterPlan().getOutgoingChannels(subQuery.getId()).get(0);
+ ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
+ DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
setRepartitionIfNecessary(subQuery, channel);
createTasks(subQuery);
@@ -458,7 +460,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
* methods and the number of partitions to a given subquery.
*/
private static void setRepartitionIfNecessary(SubQuery subQuery, DataChannel channel) {
- if (subQuery.getBlock().hasParentBlock()) {
+ if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
int numTasks = calculatePartitionNum(subQuery, channel);
Repartitioner.setPartitionNumberForTwoPhase(subQuery, numTasks, channel);
}
@@ -473,7 +475,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
*/
public static int calculatePartitionNum(SubQuery subQuery, DataChannel channel) {
TajoConf conf = subQuery.context.getConf();
- ExecutionBlock parent = subQuery.getBlock().getParentBlock();
+ MasterPlan masterPlan = subQuery.getMasterPlan();
+ ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock());
GroupbyNode grpNode = null;
if (parent != null) {
@@ -482,14 +485,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// Is this subquery the first step of join?
if (parent != null && parent.getScanNodes().length == 2) {
- Iterator<ExecutionBlock> child = parent.getChildBlocks().iterator();
+ List<ExecutionBlock> childs = masterPlan.getChilds(parent);
// for inner
- ExecutionBlock outer = child.next();
+ ExecutionBlock outer = childs.get(0);
long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer);
// for inner
- ExecutionBlock inner = child.next();
+ ExecutionBlock inner = childs.get(1);
long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner);
LOG.info("Outer volume: " + Math.ceil((double)outerVolume / 1048576));
LOG.info("Inner volume: " + Math.ceil((double)innerVolume / 1048576));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index e70f780..ba5f342 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -133,13 +133,14 @@ enum TransmitType {
message DataChannelProto {
required ExecutionBlockIdProto srcId = 1;
required ExecutionBlockIdProto targetId = 2;
+
required TransmitType transmitType = 3 [default = PULL_TRANSMIT];
required PartitionType partitionType = 4;
optional SchemaProto schema = 5;
repeated ColumnProto partitionKey = 7;
- optional int32 partitionNum = 8 [default = 1];
+ optional int32 partitionNum = 9 [default = 1];
optional StoreType storeType = 10 [default = CSV];
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index a4e39ff..bf52919 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -42,6 +42,11 @@
</property>
<property>
+ <name>tajo.master.manager.addr</name>
+ <value>127.0.0.1:9004</value>
+ </property>
+
+ <property>
<name>tajo.query.session.timeout</name>
<value>60000</value>
<description>ms</description>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
index 1635398..db886ba 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java
@@ -18,19 +18,17 @@
package org.apache.tajo.engine.query;
-import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.client.ResultSetUtil;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.Map;
import static org.junit.Assert.*;
-@Category(IntegrationTest.class)
public class TestTableSubQuery {
private static TpchTestBase tpch;
public TestTableSubQuery() throws IOException {
@@ -78,6 +76,19 @@ public class TestTableSubQuery {
"FROM\n" +
"(SELECT * FROM nation WHERE n_name LIKE 'A%') A " +
"JOIN region B ON A.n_regionkey=B.r_regionkey");
- System.out.println(ResultSetUtil.prettyFormat(res));
+
+ Map<String,String> expected = new HashMap<String, String>();
+ expected.put("ARGENTINA", "AMERICA");
+ expected.put("ALGERIA", "AFRICA");
+ try {
+ assertNotNull(res);
+ assertTrue(res.next());
+ assertTrue(expected.get(res.getString("n_name")).equals(res.getString("r_name")));
+ assertTrue(res.next());
+ assertTrue(expected.get(res.getString("n_name")).equals(res.getString("r_name")));
+ assertFalse(res.next());
+ } finally {
+ res.close();
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca97922/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
index 7012cd9..a7a7f64 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
@@ -48,7 +48,7 @@
<property>
<name>tajo.master.manager.addr</name>
- <value>127.0.0.1</value>
+ <value>127.0.0.1:0</value>
</property>
</configuration>