You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/23 14:15:45 UTC
[2/2] git commit: TAJO-194: LogicalNode should have an identifier to
distinguish each logical node instance. (hyunsik)
TAJO-194: LogicalNode should have an identifier to distinguish each logical node instance. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fc018de8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fc018de8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fc018de8
Branch: refs/heads/master
Commit: fc018de823dd34d769eb73f3c42e089b0d992b81
Parents: 44bb57d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Sep 23 21:04:02 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Sep 23 21:04:02 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../apache/tajo/engine/planner/InsertNode.java | 8 +-
.../apache/tajo/engine/planner/LogicalPlan.java | 44 +++---
.../tajo/engine/planner/LogicalPlanner.java | 91 +++++------
.../engine/planner/PhysicalPlannerImpl.java | 11 +-
.../apache/tajo/engine/planner/PlannerUtil.java | 22 ---
.../tajo/engine/planner/TargetListManager.java | 2 +-
.../tajo/engine/planner/logical/BinaryNode.java | 13 +-
.../engine/planner/logical/CreateTableNode.java | 4 +-
.../engine/planner/logical/DropTableNode.java | 4 +-
.../engine/planner/logical/EvalExprNode.java | 4 +-
.../tajo/engine/planner/logical/ExceptNode.java | 8 +-
.../engine/planner/logical/GroupbyNode.java | 13 +-
.../engine/planner/logical/IndexScanNode.java | 4 +-
.../engine/planner/logical/IntersectNode.java | 9 +-
.../tajo/engine/planner/logical/JoinNode.java | 8 +-
.../tajo/engine/planner/logical/LimitNode.java | 8 +-
.../engine/planner/logical/LogicalNode.java | 24 +--
.../engine/planner/logical/LogicalRootNode.java | 4 +-
.../tajo/engine/planner/logical/NodeType.java | 2 -
.../tajo/engine/planner/logical/PipeType.java | 24 ---
.../engine/planner/logical/ProjectionNode.java | 12 +-
.../engine/planner/logical/ReceiveNode.java | 126 ----------------
.../engine/planner/logical/RelationNode.java | 4 +-
.../engine/planner/logical/RepartitionType.java | 25 ----
.../tajo/engine/planner/logical/ScanNode.java | 12 +-
.../engine/planner/logical/SelectionNode.java | 8 +-
.../tajo/engine/planner/logical/SendNode.java | 149 -------------------
.../tajo/engine/planner/logical/SortNode.java | 15 +-
.../engine/planner/logical/StoreIndexNode.java | 4 +-
.../engine/planner/logical/StoreTableNode.java | 8 +-
.../planner/logical/TableSubQueryNode.java | 4 +-
.../tajo/engine/planner/logical/UnaryNode.java | 8 +-
.../tajo/engine/planner/logical/UnionNode.java | 8 +-
.../org/apache/tajo/master/GlobalEngine.java | 4 +-
.../org/apache/tajo/master/GlobalPlanner.java | 16 +-
.../apache/tajo/master/GlobalPlannerUtils.java | 44 ------
.../java/org/apache/tajo/util/IndexUtil.java | 5 +-
.../engine/planner/logical/TestReceiveNode.java | 61 --------
.../engine/planner/logical/TestSendNode.java | 49 ------
.../planner/physical/TestMergeJoinExec.java | 14 +-
.../planner/physical/TestPhysicalPlanner.java | 2 +-
42 files changed, 170 insertions(+), 718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0d9fc2b..223ffa1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,9 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-194: LogicalNode should have an identifier to distinguish each
+ logical node instance. (hyunsik)
+
TAJO-183: Creating too many TableMetaProto objects might lead a potential
memory leak. (jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index ed13fb2..ae88630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -39,16 +39,16 @@ public class InsertNode extends LogicalNode implements Cloneable {
@Expose private LogicalNode subQuery;
- public InsertNode(TableDesc desc, LogicalNode subQuery) {
- super(NodeType.INSERT);
+ public InsertNode(int pid, TableDesc desc, LogicalNode subQuery) {
+ super(pid, NodeType.INSERT);
this.targetTableDesc = desc;
this.subQuery = subQuery;
this.setInSchema(subQuery.getOutSchema());
this.setOutSchema(subQuery.getOutSchema());
}
- public InsertNode(Path location, LogicalNode subQuery) {
- super(NodeType.INSERT);
+ public InsertNode(int pid, Path location, LogicalNode subQuery) {
+ super(pid, NodeType.INSERT);
this.path = location;
this.subQuery = subQuery;
this.setInSchema(subQuery.getOutSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index c29f60c..7e7b45e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -40,18 +40,16 @@ public class LogicalPlan {
public static final char VIRTUAL_TABLE_PREFIX='@';
/** it indicates the root block */
public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
- /** it indicates a table itself */
- public static final String TABLE_SELF = VIRTUAL_TABLE_PREFIX + "SELF";
-
- public static final String ANONYMOUS_TABLE_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
- public static Integer anonymousBlockId = 0;
- public static Integer anonymousColumnId = 0;
+ public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "NONAME_";
+ private int nextPid = 0;
+ private Integer noNameBlockId = 0;
+ private Integer noNameColumnId = 0;
/** a map from between a block name to a block plan */
private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
- private Map<LogicalNode, QueryBlock> queryBlockByNode = new HashMap<LogicalNode, QueryBlock>();
+ private Map<Integer, LogicalNode> nodeMap = new HashMap<Integer, LogicalNode>();
+ private Map<Integer, QueryBlock> queryBlockByPID = new HashMap<Integer, QueryBlock>();
private SimpleDirectedGraph<String, BlockEdge> queryBlockGraph = new SimpleDirectedGraph<String, BlockEdge>();
- private Set<LogicalNode> visited = new HashSet<LogicalNode>();
public LogicalPlan(LogicalPlanner planner) {
this.planner = planner;
@@ -69,12 +67,16 @@ public class LogicalPlan {
return block;
}
- public QueryBlock newAnonymousBlock() {
- return newAndGetBlock(ANONYMOUS_TABLE_PREFIX + (anonymousBlockId++));
+ public int newPID() {
+ return nextPid++;
+ }
+
+ public QueryBlock newNoNameBlock() {
+ return newAndGetBlock(NONAME_BLOCK_PREFIX + (noNameBlockId++));
}
- public String newAnonymousColumnName() {
- return "?_" + (anonymousColumnId ++);
+ public String newNonameColumnName() {
+ return "?_" + (noNameColumnId++);
}
/**
@@ -95,17 +97,17 @@ public class LogicalPlan {
}
public QueryBlock getBlock(LogicalNode node) {
- return queryBlockByNode.get(node);
+ return queryBlockByPID.get(node.getPID());
}
public void removeBlock(QueryBlock block) {
queryBlocks.remove(block.getName());
- List<LogicalNode> tobeRemoved = new ArrayList<LogicalNode>();
- for (Map.Entry<LogicalNode, QueryBlock> entry : queryBlockByNode.entrySet()) {
+ List<Integer> tobeRemoved = new ArrayList<Integer>();
+ for (Map.Entry<Integer, QueryBlock> entry : queryBlockByPID.entrySet()) {
tobeRemoved.add(entry.getKey());
}
- for (LogicalNode rn : tobeRemoved) {
- queryBlockByNode.remove(rn);
+ for (Integer rn : tobeRemoved) {
+ queryBlockByPID.remove(rn);
}
}
@@ -353,7 +355,7 @@ public class LogicalPlan {
LogicalRootNode rootNode = (LogicalRootNode) blockRoot;
rootType = rootNode.getChild().getType();
}
- queryBlockByNode.put(blockRoot, this);
+ queryBlockByPID.put(blockRoot.getPID(), this);
}
public <T extends LogicalNode> T getRoot() {
@@ -510,10 +512,12 @@ public class LogicalPlan {
}
public boolean postVisit(LogicalNode node, Stack<OpType> path) {
- if (visited.contains(node)) {
+ if (nodeMap.containsKey(node.getPID())) {
return false;
}
+ nodeMap.put(node.getPID(), node);
+
// if an added operator is a relation, add it to relation set.
switch (node.getType()) {
case STORE:
@@ -721,7 +725,7 @@ public class LogicalPlan {
} else {
Target [] addedTargets = new Target[aggrFunctions.size()];
for (int i = 0; i < aggrFunctions.size(); i++) {
- Target aggrFunctionTarget = new Target(aggrFunctions.get(i), newAnonymousColumnName());
+ Target aggrFunctionTarget = new Target(aggrFunctions.get(i), newNonameColumnName());
addedTargets[i] = aggrFunctionTarget;
EvalTreeUtil.replace(havingCondition, aggrFunctions.get(i),
new FieldEval(aggrFunctionTarget.getColumnSchema()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 6dd3e77..52f0547 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -96,7 +96,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
PlanContext context = new PlanContext(plan, rootBlock);
subroot = visitChild(context, stack, expr);
- LogicalRootNode root = new LogicalRootNode();
+ LogicalRootNode root = new LogicalRootNode(plan.newPID());
root.setInSchema(subroot.getOutSchema());
root.setOutSchema(subroot.getOutSchema());
root.setChild(subroot);
@@ -140,13 +140,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
}
- public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr) throws PlanningException {
+ public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TableSubQuery expr)
+ throws PlanningException {
QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
PlanContext newContext = new PlanContext(context.plan, newBlock);
Stack<OpType> newStack = new Stack<OpType>();
LogicalNode child = visitChild(newContext, newStack, expr.getSubQuery());
context.plan.connectBlocks(newContext.block, context.block, BlockType.TableSubQuery);
- return new TableSubQueryNode(expr.getName(), child);
+ return new TableSubQueryNode(context.plan.newPID(), expr.getName(), child);
}
@@ -162,9 +163,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
ScanNode scanNode;
if (relation.hasAlias()) {
- scanNode = new ScanNode(desc, relation.getAlias());
+ scanNode = new ScanNode(context.plan.newPID(), desc, relation.getAlias());
} else {
- scanNode = new ScanNode(desc);
+ scanNode = new ScanNode(context.plan.newPID(), desc);
}
return scanNode;
@@ -186,7 +187,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
for (int i = 1; i < relations.size(); i++) {
left = current;
right = visitChild(context, stack, relations.getRelations()[i]);
- current = createCatasianProduct(left, right);
+ current = createCatasianProduct(context.plan, left, right);
}
}
@@ -207,7 +208,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
stack.pop();
// Phase 3: build this plan
- JoinNode joinNode = new JoinNode(join.getJoinType(), left, right);
+ JoinNode joinNode = new JoinNode(plan.newPID(), join.getJoinType(), left, right);
// Set A merged input schema
Schema merged;
@@ -255,8 +256,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return njQual;
}
- private static LogicalNode createCatasianProduct(LogicalNode left, LogicalNode right) {
- JoinNode join = new JoinNode(JoinType.CROSS, left, right);
+ private static LogicalNode createCatasianProduct(LogicalPlan plan, LogicalNode left, LogicalNode right) {
+ JoinNode join = new JoinNode(plan.newPID(), JoinType.CROSS, left, right);
Schema joinSchema = SchemaUtil.merge(
join.getLeftChild().getOutSchema(),
join.getRightChild().getOutSchema());
@@ -319,27 +320,27 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
QueryBlock block = context.block;
// 2. Build Child Plans
- PlanContext leftContext = new PlanContext(plan, plan.newAnonymousBlock());
+ PlanContext leftContext = new PlanContext(plan, plan.newNoNameBlock());
Stack<OpType> leftStack = new Stack<OpType>();
LogicalNode left = visitChild(leftContext, leftStack, setOperation.getLeft());
- TableSubQueryNode leftSubQuery = new TableSubQueryNode(leftContext.block.getName(), left);
+ TableSubQueryNode leftSubQuery = new TableSubQueryNode(plan.newPID(), leftContext.block.getName(), left);
context.plan.connectBlocks(leftContext.block, context.block, BlockType.TableSubQuery);
- PlanContext rightContext = new PlanContext(plan, plan.newAnonymousBlock());
+ PlanContext rightContext = new PlanContext(plan, plan.newNoNameBlock());
Stack<OpType> rightStack = new Stack<OpType>();
LogicalNode right = visitChild(rightContext, rightStack, setOperation.getRight());
- TableSubQueryNode rightSubQuery = new TableSubQueryNode(rightContext.block.getName(), right);
+ TableSubQueryNode rightSubQuery = new TableSubQueryNode(plan.newPID(), rightContext.block.getName(), right);
context.plan.connectBlocks(rightContext.block, context.block, BlockType.TableSubQuery);
verifySetStatement(setOperation.getType(), leftContext.block, rightContext.block);
BinaryNode setOp;
if (setOperation.getType() == OpType.Union) {
- setOp = new UnionNode(leftSubQuery, rightSubQuery);
+ setOp = new UnionNode(plan.newPID(), leftSubQuery, rightSubQuery);
} else if (setOperation.getType() == OpType.Except) {
- setOp = new ExceptNode(leftSubQuery, rightSubQuery);
+ setOp = new ExceptNode(plan.newPID(), leftSubQuery, rightSubQuery);
} else if (setOperation.getType() == OpType.Intersect) {
- setOp = new IntersectNode(leftSubQuery, rightSubQuery);
+ setOp = new IntersectNode(plan.newPID(), leftSubQuery, rightSubQuery);
} else {
throw new VerifyException("Invalid Type: " + setOperation.getType());
}
@@ -395,7 +396,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// 3. build this plan:
EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
- SelectionNode selectionNode = new SelectionNode(searchCondition);
+ SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
// 4. set child plan, update input/output schemas:
selectionNode.setChild(child);
@@ -435,7 +436,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
groupElements[i].getType(),
annotateGroupingColumn(plan, block.getName(), groupElements[i].getColumns(), null));
}
- GroupbyNode groupingNode = new GroupbyNode(annotatedElements[0].getColumns());
+ GroupbyNode groupingNode = new GroupbyNode(plan.newPID(), annotatedElements[0].getColumns());
if (aggregation.hasHavingCondition()) {
groupingNode.setHavingCondition(
createEvalTree(plan, block, aggregation.getHavingCondition()));
@@ -471,7 +472,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
UnionNode union;
try {
if ((cuboids.size() - idx) > 2) {
- GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+ GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
Target[] clone = cloneTargets(block.getCurrentTargets());
g1.setTargets(clone);
@@ -481,13 +482,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
g1.setOutSchema(outSchema);
LogicalNode right = createGroupByUnion(plan, block, subNode, cuboids, idx+1);
- union = new UnionNode(g1, right);
+ union = new UnionNode(plan.newPID(), g1, right);
union.setInSchema(g1.getOutSchema());
union.setOutSchema(g1.getOutSchema());
return union;
} else {
- GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+ GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
Target[] clone = cloneTargets(block.getCurrentTargets());
g1.setTargets(clone);
g1.setChild((LogicalNode) subNode.clone());
@@ -495,14 +496,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
Schema outSchema = getProjectedSchema(plan, clone);
g1.setOutSchema(outSchema);
- GroupbyNode g2 = new GroupbyNode(cuboids.get(idx+1));
+ GroupbyNode g2 = new GroupbyNode(plan.newPID(), cuboids.get(idx+1));
clone = cloneTargets(block.getCurrentTargets());
g2.setTargets(clone);
g2.setChild((LogicalNode) subNode.clone());
g2.setInSchema(g1.getChild().getOutSchema());
outSchema = getProjectedSchema(plan, clone);
g2.setOutSchema(outSchema);
- union = new UnionNode(g1, g2);
+ union = new UnionNode(plan.newPID(), g1, g2);
union.setInSchema(g1.getOutSchema());
union.setOutSchema(g1.getOutSchema());
@@ -575,7 +576,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// 2. Build Child Plans:
stack.push(OpType.Sort);
LogicalNode child = visitChild(context, stack, sort.getChild());
- child = insertGroupbyNodeIfUnresolved(block, child, stack);
+ child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
stack.pop();
// 3. Build this plan:
@@ -587,7 +588,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(),
sortSpecs[i].isNullFirst());
}
- SortNode sortNode = new SortNode(annotatedSortSpecs);
+ SortNode sortNode = new SortNode(context.plan.newPID(), annotatedSortSpecs);
// 4. Set Child Plan, Update Input/Output Schemas:
sortNode.setChild(child);
@@ -611,7 +612,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// build limit plan
EvalNode firstFetchNum = createEvalTree(plan, block, limit.getFetchFirstNum());
firstFetchNum.eval(null, null, null);
- LimitNode limitNode = new LimitNode(firstFetchNum.terminate(null).asInt8());
+ LimitNode limitNode = new LimitNode(context.plan.newPID(), firstFetchNum.terminate(null).asInt8());
// set child plan and update input/output schemas.
limitNode.setChild(child);
@@ -638,7 +639,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
if (!projection.hasChild()) {
EvalExprNode evalOnly =
- new EvalExprNode(annotateTargets(plan, block, projection.getTargets()));
+ new EvalExprNode(context.plan.newPID(), annotateTargets(plan, block, projection.getTargets()));
evalOnly.setOutSchema(getProjectedSchema(plan, evalOnly.getExprs()));
block.setProjectionNode(evalOnly);
for (int i = 0; i < evalOnly.getTargets().length; i++) {
@@ -650,7 +651,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
// 2: Build Child Plans
stack.push(OpType.Projection);
LogicalNode child = visitChild(context, stack, projection.getChild());
- child = insertGroupbyNodeIfUnresolved(block, child, stack);
+ child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
stack.pop();
// All targets must be evaluable before the projection.
@@ -659,9 +660,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
ProjectionNode projectionNode;
if (projection.isAllProjected()) {
- projectionNode = new ProjectionNode(PlannerUtil.schemaToTargets(child.getOutSchema()));
+ projectionNode = new ProjectionNode(context.plan.newPID(), PlannerUtil.schemaToTargets(child.getOutSchema()));
} else {
- projectionNode = new ProjectionNode(block.getCurrentTargets());
+ projectionNode = new ProjectionNode(context.plan.newPID(), block.getCurrentTargets());
}
block.setProjectionNode(projectionNode);
@@ -674,7 +675,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
} else {
if (projection.isDistinct()) {
Schema outSchema = projectionNode.getOutSchema();
- GroupbyNode dupRemoval = new GroupbyNode(outSchema.toArray());
+ GroupbyNode dupRemoval = new GroupbyNode(plan.newPID(), outSchema.toArray());
dupRemoval.setTargets(block.getTargetListManager().getTargets());
dupRemoval.setInSchema(child.getOutSchema());
dupRemoval.setOutSchema(outSchema);
@@ -690,11 +691,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
* Insert a group-by operator before a sort or a projection operator.
* It is used only when a group-by clause is not given.
*/
- private LogicalNode insertGroupbyNodeIfUnresolved(QueryBlock block,
+ private LogicalNode insertGroupbyNodeIfUnresolved(LogicalPlan plan, QueryBlock block,
LogicalNode child, Stack<OpType> stack) throws PlanningException {
if (!block.isGroupingResolved()) {
- GroupbyNode groupbyNode = new GroupbyNode(new Column[] {});
+ GroupbyNode groupbyNode = new GroupbyNode(plan.newPID(), new Column[] {});
groupbyNode.setTargets(block.getCurrentTargets());
groupbyNode.setChild(child);
groupbyNode.setInSchema(child.getOutSchema());
@@ -731,7 +732,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
stack.add(OpType.CreateTable);
LogicalNode subQuery = visitChild(context, stack, expr.getSubQuery());
stack.pop();
- StoreTableNode storeNode = new StoreTableNode(tableName);
+ StoreTableNode storeNode = new StoreTableNode(context.plan.newPID(), tableName);
storeNode.setCreateTable();
storeNode.setChild(subQuery);
@@ -759,7 +760,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return storeNode;
} else {
- CreateTableNode createTableNode = new CreateTableNode(expr.getTableName(),
+ CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
convertTableElementsSchema(expr.getTableElements()));
if (expr.isExternal()) {
@@ -836,25 +837,25 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
return column;
}
- protected LogicalNode visitInsert(PlanContext ctx, Stack<OpType> stack, Insert expr) throws PlanningException {
+ protected LogicalNode visitInsert(PlanContext context, Stack<OpType> stack, Insert expr) throws PlanningException {
stack.push(expr.getType());
- QueryBlock newQueryBlock = ctx.plan.newAnonymousBlock();
- PlanContext newContext = new PlanContext(ctx.plan, newQueryBlock);
+ QueryBlock newQueryBlock = context.plan.newNoNameBlock();
+ PlanContext newContext = new PlanContext(context.plan, newQueryBlock);
Stack<OpType> subStack = new Stack<OpType>();
LogicalNode subQuery = visitChild(newContext, subStack, expr.getSubQuery());
- ctx.plan.connectBlocks(newQueryBlock, ctx.block, BlockType.TableSubQuery);
+ context.plan.connectBlocks(newQueryBlock, context.block, BlockType.TableSubQuery);
stack.pop();
InsertNode insertNode = null;
if (expr.hasTableName()) {
TableDesc desc = catalog.getTableDesc(expr.getTableName());
- ctx.block.addRelation(new ScanNode(desc));
+ context.block.addRelation(new ScanNode(context.plan.newPID(), desc));
Schema targetSchema = new Schema();
if (expr.hasTargetColumns()) {
String [] targetColumnNames = expr.getTargetColumns();
for (int i = 0; i < targetColumnNames.length; i++) {
- Column targetColumn = ctx.plan.resolveColumn(ctx.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
+ Column targetColumn = context.plan.resolveColumn(context.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
targetSchema.addColumn(targetColumn);
}
} else {
@@ -865,13 +866,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
}
ensureDomains(targetSchema, subQuery.getOutSchema());
- insertNode = new InsertNode(desc, subQuery);
+ insertNode = new InsertNode(context.plan.newPID(), desc, subQuery);
insertNode.setTargetSchema(targetSchema);
insertNode.setOutSchema(targetSchema);
}
if (expr.hasLocation()) {
- insertNode = new InsertNode(new Path(expr.getLocation()), subQuery);
+ insertNode = new InsertNode(context.plan.newPID(), new Path(expr.getLocation()), subQuery);
if (expr.hasStorageType()) {
insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
}
@@ -906,7 +907,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
@Override
public LogicalNode visitDropTable(PlanContext context, Stack<OpType> stack, DropTable dropTable) {
- DropTableNode dropTableNode = new DropTableNode(dropTable.getTableName());
+ DropTableNode dropTableNode = new DropTableNode(context.plan.newPID(), dropTable.getTableName());
return dropTableNode;
}
@@ -1180,7 +1181,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
if (t.hasAlias()) {
name = t.getAlias();
} else if (t.getEvalTree().getName().equals("?")) {
- name = plan.newAnonymousColumnName();
+ name = plan.newNonameColumnName();
} else {
name = t.getEvalTree().getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index b118852..4813fd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -45,9 +45,12 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
+ private static final int UNGENERATED_PID = -1;
+
protected final TajoConf conf;
protected final AbstractStorageManager sm;
+
public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
this.conf = conf;
this.sm = sm;
@@ -76,7 +79,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
PhysicalExec execPlan) throws IOException {
DataChannel channel = context.getDataChannel();
- StoreTableNode storeTableNode = new StoreTableNode(channel.getTargetId().toString());
+ StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
storeTableNode.setInSchema(execPlan.getSchema());
storeTableNode.setOutSchema(execPlan.getSchema());
@@ -220,10 +223,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
joinNode.getJoinQual(), outer.getSchema(), inner.getSchema());
ExternalSortExec outerSort = new ExternalSortExec(ctx, sm,
- new SortNode(sortSpecs[0], outer.getSchema(), outer.getSchema()),
+ new SortNode(UNGENERATED_PID, sortSpecs[0], outer.getSchema(), outer.getSchema()),
outer);
ExternalSortExec innerSort = new ExternalSortExec(ctx, sm,
- new SortNode(sortSpecs[1], inner.getSchema(), inner.getSchema()),
+ new SortNode(UNGENERATED_PID, sortSpecs[1], inner.getSchema(), inner.getSchema()),
inner);
LOG.info("The planner chooses [Merge Join]");
@@ -295,7 +298,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
for (int i = 0; i < grpColumns.length; i++) {
specs[i] = new SortSpec(grpColumns[i], true, false);
}
- SortNode sortNode = new SortNode(specs);
+ SortNode sortNode = new SortNode(UNGENERATED_PID, specs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
// SortExec sortExec = new SortExec(sortNode, child);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 2c9a5eb..7b39d26 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -66,21 +66,6 @@ public class PlannerUtil {
return tableNames;
}
- public static LogicalNode insertNode(LogicalNode parent, LogicalNode newNode) {
- Preconditions.checkArgument(parent instanceof UnaryNode);
- Preconditions.checkArgument(newNode instanceof UnaryNode);
-
- UnaryNode p = (UnaryNode) parent;
- LogicalNode c = p.getChild();
- UnaryNode m = (UnaryNode) newNode;
- m.setInSchema(c.getOutSchema());
- m.setOutSchema(c.getOutSchema());
- m.setChild(c);
- p.setChild(m);
-
- return p;
- }
-
/**
* Delete the logical node from a plan.
*
@@ -191,13 +176,6 @@ public class PlannerUtil {
return child;
}
- private static LogicalNode insertStore(LogicalNode parent, String tableName) {
- StoreTableNode store = new StoreTableNode(tableName);
- insertNode(parent, store);
-
- return parent;
- }
-
/**
* Find the top logical node matched to type from the given node
*
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
index 970ab93..ccb64ce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/TargetListManager.java
@@ -155,7 +155,7 @@ public class TargetListManager {
if (t.hasAlias()) {
name = t.getAlias();
} else if (t.getEvalTree().getName().equals("?")) {
- name = plan.newAnonymousColumnName();
+ name = plan.newNonameColumnName();
} else {
name = t.getEvalTree().getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
index e2724e7..37b0fdf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/BinaryNode.java
@@ -27,16 +27,9 @@ import org.apache.tajo.json.GsonObject;
public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonObject {
@Expose LogicalNode leftChild = null;
@Expose LogicalNode rightChild = null;
-
- public BinaryNode() {
- super();
- }
-
- /**
- * @param opType
- */
- public BinaryNode(NodeType opType) {
- super(opType);
+
+ public BinaryNode(int pid, NodeType nodeType) {
+ super(pid, nodeType);
}
public <T extends LogicalNode> T getLeftChild() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index f7ff8ef..50656c5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -36,8 +36,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
@Expose private Options options;
@Expose private boolean external;
- public CreateTableNode(String tableName, Schema schema) {
- super(NodeType.CREATE_TABLE);
+ public CreateTableNode(int pid, String tableName, Schema schema) {
+ super(pid, NodeType.CREATE_TABLE);
this.tableName = tableName;
this.schema = schema;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
index 708c03a..490bfe7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/DropTableNode.java
@@ -23,8 +23,8 @@ import org.apache.tajo.engine.planner.PlanString;
public class DropTableNode extends LogicalNode {
private String tableName;
- public DropTableNode(String tableName) {
- super(NodeType.DROP_TABLE);
+ public DropTableNode(int pid, String tableName) {
+ super(pid, NodeType.DROP_TABLE);
this.tableName = tableName;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
index bb323dc..5bc41bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/EvalExprNode.java
@@ -29,8 +29,8 @@ import org.apache.tajo.util.TUtil;
public class EvalExprNode extends LogicalNode implements Projectable {
@Expose private Target[] exprs;
- public EvalExprNode(Target[] exprs) {
- super(NodeType.EXPRS);
+ public EvalExprNode(int pid, Target[] exprs) {
+ super(pid, NodeType.EXPRS);
this.exprs = exprs;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
index fd3c0d0..0520214 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
public class ExceptNode extends BinaryNode {
- public ExceptNode() {
- super(NodeType.EXCEPT);
- }
-
- public ExceptNode(LogicalNode outer, LogicalNode inner) {
- this();
+ public ExceptNode(int pid, LogicalNode outer, LogicalNode inner) {
+ super(pid, NodeType.EXCEPT);
setLeftChild(outer);
setRightChild(inner);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 5c4e28e..4f9b6d7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -32,18 +32,13 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
@Expose private EvalNode havingCondition = null;
@Expose private Target [] targets;
- public GroupbyNode() {
- super();
- }
-
- public GroupbyNode(final Column [] columns) {
- super(NodeType.GROUP_BY);
+ public GroupbyNode(int pid, final Column [] columns) {
+ super(pid, NodeType.GROUP_BY);
this.columns = columns;
}
- public GroupbyNode(final Column [] columns,
- final EvalNode havingCondition) {
- this(columns);
+ public GroupbyNode(int pid, final Column [] columns, final EvalNode havingCondition) {
+ this(pid, columns);
this.havingCondition = havingCondition;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
index 8ece090..7272630 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IndexScanNode.java
@@ -30,9 +30,9 @@ public class IndexScanNode extends ScanNode {
@Expose private Schema keySchema = null;
@Expose private Datum[] datum = null;
- public IndexScanNode(ScanNode scanNode ,
+ public IndexScanNode(int pid, ScanNode scanNode ,
Schema keySchema , Datum[] datum, SortSpec[] sortKeys ) {
- super(scanNode.getTableDesc());
+ super(pid, scanNode.getTableDesc());
setQual(scanNode.getQual());
setInSchema(scanNode.getInSchema());
setOutSchema(scanNode.getOutSchema());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
index 0882e56..493698d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
@@ -24,13 +24,8 @@ package org.apache.tajo.engine.planner.logical;
import org.apache.tajo.engine.planner.PlanString;
public class IntersectNode extends BinaryNode {
-
- public IntersectNode() {
- super(NodeType.INTERSECT);
- }
-
- public IntersectNode(LogicalNode outer, LogicalNode inner) {
- this();
+ public IntersectNode(int pid, LogicalNode outer, LogicalNode inner) {
+ super(pid, NodeType.INTERSECT);
setLeftChild(outer);
setRightChild(inner);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
index 45f40f9..58214ac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
@@ -32,14 +32,14 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable {
@Expose private EvalNode joinQual;
@Expose private Target[] targets;
- public JoinNode(JoinType joinType, LogicalNode left) {
- super(NodeType.JOIN);
+ public JoinNode(int pid, JoinType joinType, LogicalNode left) {
+ super(pid, NodeType.JOIN);
this.joinType = joinType;
setLeftChild(left);
}
- public JoinNode(JoinType joinType, LogicalNode left, LogicalNode right) {
- super(NodeType.JOIN);
+ public JoinNode(int pid, JoinType joinType, LogicalNode left, LogicalNode right) {
+ super(pid, NodeType.JOIN);
this.joinType = joinType;
setLeftChild(left);
setRightChild(right);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
index 697f1fc..2e1057d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -24,12 +24,8 @@ import org.apache.tajo.engine.planner.PlanString;
public final class LimitNode extends UnaryNode implements Cloneable {
@Expose private long fetchFirstNum;
- public LimitNode() {
- super();
- }
-
- public LimitNode(long fetchFirstNum) {
- super(NodeType.LIMIT);
+ public LimitNode(int pid, long fetchFirstNum) {
+ super(pid, NodeType.LIMIT);
this.fetchFirstNum = fetchFirstNum;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
index 7d7e86d..042d9ef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
@@ -29,18 +29,21 @@ import org.apache.tajo.json.GsonObject;
import org.apache.tajo.util.TUtil;
public abstract class LogicalNode implements Cloneable, GsonObject {
+ @Expose private int pid;
@Expose private NodeType type;
@Expose private Schema inputSchema;
@Expose private Schema outputSchema;
@Expose private double cost = 0;
-
- public LogicalNode() {
- }
- public LogicalNode(NodeType type) {
- this.type = type;
+ public LogicalNode(int pid, NodeType type) {
+ this.pid = pid;
+ this.type = type;
}
+
+ public int getPID() {
+ return pid;
+ }
public NodeType getType() {
return this.type;
@@ -79,7 +82,8 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
if (obj instanceof LogicalNode) {
LogicalNode other = (LogicalNode) obj;
- boolean eq = this.type == other.type;
+ boolean eq = this.pid == other.pid;
+ eq = this.type == other.type;
eq = eq && TUtil.checkEquals(this.inputSchema, other.inputSchema);
eq = eq && TUtil.checkEquals(this.outputSchema, other.outputSchema);
eq = eq && this.cost == other.cost;
@@ -93,12 +97,10 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
@Override
public Object clone() throws CloneNotSupportedException {
LogicalNode node = (LogicalNode)super.clone();
+ node.pid = pid;
node.type = type;
- node.inputSchema =
- (Schema) (inputSchema != null ? inputSchema.clone() : null);
- node.outputSchema =
- (Schema) (outputSchema != null ? outputSchema.clone() : null);
-
+ node.inputSchema = (Schema) (inputSchema != null ? inputSchema.clone() : null);
+ node.outputSchema = (Schema) (outputSchema != null ? outputSchema.clone() : null);
return node;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
index e5373db..9cad5af 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
@@ -21,8 +21,8 @@ package org.apache.tajo.engine.planner.logical;
import org.apache.tajo.engine.planner.PlanString;
public class LogicalRootNode extends UnaryNode implements Cloneable {
- public LogicalRootNode() {
- super(NodeType.ROOT);
+ public LogicalRootNode(int pid) {
+ super(pid, NodeType.ROOT);
}
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 792fee8..44790ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -38,9 +38,7 @@ public enum NodeType {
LIMIT(LimitNode.class),
JOIN(JoinNode.class),
PROJECTION(ProjectionNode.class),
- RECEIVE(ReceiveNode.class),
ROOT(LogicalRootNode.class),
- SEND(SendNode.class),
SCAN(ScanNode.class),
SELECTION(SelectionNode.class),
STORE(StoreTableNode.class),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
deleted file mode 100644
index 0355b21..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PipeType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-public enum PipeType {
- PULL,
- PUSH
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
index 935f188..a5aa804 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
@@ -32,18 +32,10 @@ public class ProjectionNode extends UnaryNode implements Projectable {
@Expose private boolean distinct = false;
/**
- * This method is for gson.
- */
- @SuppressWarnings("unused")
- private ProjectionNode() {
- super();
- }
-
- /**
* @param targets they should be all evaluated ones.
*/
- public ProjectionNode(Target [] targets) {
- super(NodeType.PROJECTION);
+ public ProjectionNode(int pid, Target [] targets) {
+ super(pid, NodeType.PROJECTION);
this.targets = targets;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
deleted file mode 100644
index a37ad5f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ReceiveNode.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.base.Objects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlanString;
-
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-
-public final class ReceiveNode extends LogicalNode implements Cloneable {
- @Expose private PipeType pipeType;
- @Expose private RepartitionType repaType;
- @Expose private Map<String, List<URI>> fetchMap;
-
- private ReceiveNode() {
- super(NodeType.RECEIVE);
- }
- public ReceiveNode(PipeType pipeType, RepartitionType shuffleType) {
- this();
- this.pipeType = pipeType;
- this.repaType = shuffleType;
- this.fetchMap = Maps.newHashMap();
- }
-
- public PipeType getPipeType() {
- return this.pipeType;
- }
-
- public RepartitionType getRepartitionType() {
- return this.repaType;
- }
-
- public void addData(String name, URI uri) {
- if (fetchMap.containsKey(name)) {
- fetchMap.get(name).add(uri);
- } else {
- fetchMap.put(name, Lists.newArrayList(uri));
- }
- }
-
- public Collection<URI> getSrcURIs(String name) {
- return Collections.unmodifiableList(fetchMap.get(name));
- }
-
- public Collection<Entry<String, List<URI>>> getAllDataSet() {
- return Collections.unmodifiableSet(fetchMap.entrySet());
- }
-
-
- @Override
- public PlanString getPlanString() {
- return new PlanString("Receive ");
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ReceiveNode) {
- ReceiveNode other = (ReceiveNode) obj;
- return pipeType == other.pipeType && repaType == other.repaType;
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(pipeType, repaType);
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException {
- ReceiveNode receive = (ReceiveNode) super.clone();
- receive.pipeType = pipeType;
- receive.repaType = repaType;
- receive.fetchMap = Maps.newHashMap();
- // Both String and URI are immutable, but a list is mutable.
- for (Entry<String, List<URI>> entry : fetchMap.entrySet()) {
- receive.fetchMap
- .put(entry.getKey(), new ArrayList<URI>(entry.getValue()));
- }
-
- return receive;
- }
-
- @Override
- public String toString() {
- Gson gson = CoreGsonHelper.getPrettyInstance();
- return gson.toJson(this);
- }
-
- @Override
- public void preOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
- }
-
- @Override
- public void postOrder(LogicalNodeVisitor visitor) {
- visitor.visit(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 583e472..6a30aa6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Schema;
public abstract class RelationNode extends LogicalNode {
- public RelationNode(NodeType nodeType) {
- super(nodeType);
+ public RelationNode(int pid, NodeType nodeType) {
+ super(pid, nodeType);
assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
deleted file mode 100644
index d317fb8..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RepartitionType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-public enum RepartitionType {
- NONE,
- HASH,
- SORT
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 3596e2d..04c7b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -34,20 +34,16 @@ public class ScanNode extends RelationNode implements Projectable {
@Expose private Schema renamedSchema;
@Expose private EvalNode qual;
@Expose private Target[] targets;
-
- public ScanNode() {
- super(NodeType.SCAN);
- }
- public ScanNode(TableDesc desc) {
- super(NodeType.SCAN);
+ public ScanNode(int pid, TableDesc desc) {
+ super(pid, NodeType.SCAN);
this.tableDesc = desc;
this.setInSchema(tableDesc.getSchema());
this.setOutSchema(tableDesc.getSchema());
}
- public ScanNode(TableDesc desc, String alias) {
- this(desc);
+ public ScanNode(int pid, TableDesc desc, String alias) {
+ this(pid, desc);
this.alias = PlannerUtil.normalizeTableName(alias);
renamedSchema = getOutSchema();
renamedSchema.setQualifier(this.alias, true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index 2435453..a12fc06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
public class SelectionNode extends UnaryNode implements Cloneable {
@Expose private EvalNode qual;
- public SelectionNode() {
- super();
- }
-
- public SelectionNode(EvalNode qual) {
- super(NodeType.SELECTION);
+ public SelectionNode(int pid, EvalNode qual) {
+ super(pid, NodeType.SELECTION);
setQual(qual);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
deleted file mode 100644
index 96e6fc5..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SendNode.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlanString;
-import org.apache.tajo.util.TUtil;
-
-import java.net.URI;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This logical node means that the worker sends intermediate data to
- * some destined one or more workers.
- */
-public class SendNode extends UnaryNode {
- @Expose private PipeType pipeType;
- @Expose private RepartitionType repaType;
- /** This will be used for pipeType == PUSH. */
- @Expose private Map<Integer, URI> destURIs;
- @Expose private Column[] partitionKeys;
- @Expose private int numPartitions;
-
- private SendNode() {
- super(NodeType.SEND);
- }
-
- public SendNode(PipeType pipeType, RepartitionType repaType) {
- this();
- this.pipeType = pipeType;
- this.repaType = repaType;
- this.destURIs = Maps.newHashMap();
- }
-
- public PipeType getPipeType() {
- return this.pipeType;
- }
-
- public RepartitionType getRepartitionType() {
- return this.repaType;
- }
-
- public URI getDestURI(int partition) {
- return this.destURIs.get(partition);
- }
-
- public void setPartitionKeys(Column [] keys, int numPartitions) {
- Preconditions.checkState(repaType != RepartitionType.NONE,
- "Hash or Sort repartition only requires the partition keys");
- Preconditions.checkArgument(keys.length > 0,
- "At least one partition key must be specified.");
- Preconditions.checkArgument(numPartitions > 0,
- "The number of partitions must be positive: %s", numPartitions);
- this.partitionKeys = keys;
- this.numPartitions = numPartitions;
- }
-
- public boolean hasPartitionKeys() {
- return this.partitionKeys != null;
- }
-
- public Column [] getPartitionKeys() {
- return this.partitionKeys;
- }
-
- public int getPartitionsNum() {
- return this.numPartitions;
- }
-
- public Iterator<Entry<Integer, URI>> getAllDestURIs() {
- return this.destURIs.entrySet().iterator();
- }
-
- public void putDestURI(int partition, URI uri) {
- this.destURIs.put(partition, uri);
- }
-
- public void setDestURIs(Map<Integer, URI> destURIs) {
- this.destURIs = destURIs;
- }
-
- @Override
- public PlanString getPlanString() {
- return null;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof SendNode) {
- SendNode other = (SendNode) obj;
- return pipeType == other.pipeType
- && repaType == other.repaType
- && TUtil.checkEquals(destURIs, other.destURIs);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(pipeType, repaType, destURIs);
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException {
- SendNode send = (SendNode) super.clone();
- send.pipeType = pipeType;
- send.repaType = repaType;
- send.destURIs = Maps.newHashMap();
- for (Entry<Integer, URI> entry : destURIs.entrySet()) {
- send.destURIs.put(entry.getKey(), entry.getValue());
- }
-
- return send;
- }
-
- @Override
- public String toString() {
- Gson gson = CoreGsonHelper.getPrettyInstance();
- return gson.toJson(this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
index e754a7f..c3a457e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -26,22 +26,17 @@ import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
public final class SortNode extends UnaryNode implements Cloneable {
- @Expose
- private SortSpec [] sortKeys;
-
- public SortNode() {
- super();
- }
+ @Expose private SortSpec [] sortKeys;
- public SortNode(SortSpec[] sortKeys) {
- super(NodeType.SORT);
+ public SortNode(int pid, SortSpec[] sortKeys) {
+ super(pid, NodeType.SORT);
Preconditions.checkArgument(sortKeys.length > 0,
"At least one sort key must be specified");
this.sortKeys = sortKeys;
}
- public SortNode(SortSpec[] sortKeys, Schema inSchema, Schema outSchema) {
- this(sortKeys);
+ public SortNode(int pid, SortSpec[] sortKeys, Schema inSchema, Schema outSchema) {
+ this(pid, sortKeys);
this.setInSchema(inSchema);
this.setOutSchema(outSchema);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
index 3c44374..c3d7307 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreIndexNode.java
@@ -20,8 +20,8 @@ package org.apache.tajo.engine.planner.logical;
public class StoreIndexNode extends StoreTableNode {
- public StoreIndexNode(String tableName) {
- super(tableName);
+ public StoreIndexNode(int pid, String tableName) {
+ super(pid, tableName);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index a2dd097..6872316 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -39,8 +39,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
- public StoreTableNode(String tableName) {
- super(NodeType.STORE);
+ public StoreTableNode(int pid, String tableName) {
+ super(pid, NodeType.STORE);
this.tableName = tableName;
}
@@ -48,10 +48,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.tableName;
}
- public final void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
public void setStorageType(StoreType storageType) {
this.storageType = storageType;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index e246466..39ae1e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -30,8 +30,8 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
@Expose private LogicalNode subQuery;
@Expose private Target [] targets; // unused
- public TableSubQueryNode(String tableName, LogicalNode subQuery) {
- super(NodeType.TABLE_SUBQUERY);
+ public TableSubQueryNode(int pid, String tableName, LogicalNode subQuery) {
+ super(pid, NodeType.TABLE_SUBQUERY);
this.tableName = PlannerUtil.normalizeTableName(tableName);
this.subQuery = subQuery;
setOutSchema((Schema) this.subQuery.getOutSchema().clone());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 7f6e065..2e091be 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -24,15 +24,11 @@ import com.google.gson.annotations.Expose;
public abstract class UnaryNode extends LogicalNode implements Cloneable {
@Expose LogicalNode child;
- public UnaryNode() {
- super();
- }
-
/**
* @param type
*/
- public UnaryNode(NodeType type) {
- super(type);
+ public UnaryNode(int pid, NodeType type) {
+ super(pid, type);
}
public void setChild(LogicalNode subNode) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
index a62e91b..d0e8b02 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -25,12 +25,8 @@ import org.apache.tajo.engine.planner.PlanString;
public class UnionNode extends BinaryNode {
- public UnionNode() {
- super(NodeType.UNION);
- }
-
- public UnionNode(LogicalNode outer, LogicalNode inner) {
- this();
+ public UnionNode(int pid, LogicalNode outer, LogicalNode inner) {
+ super(pid, NodeType.UNION);
setLeftChild(outer);
setRightChild(inner);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index c8e3d91..7e60097 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -373,7 +373,7 @@ public class GlobalEngine extends AbstractService {
queryContext.setFileOutput();
}
- storeNode = new StoreTableNode(outputTableName);
+ storeNode = new StoreTableNode(plan.newPID(), outputTableName);
queryContext.setOutputPath(outputPath);
if (insertNode.isOverwrite()) {
@@ -416,7 +416,7 @@ public class GlobalEngine extends AbstractService {
}
- ProjectionNode projectionNode = new ProjectionNode(targets);
+ ProjectionNode projectionNode = new ProjectionNode(plan.newPID(), targets);
projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 48f45f6..620e813 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -104,12 +104,12 @@ public class GlobalPlanner {
LOG.info(masterPlan);
}
- public static ScanNode buildInputExecutor(DataChannel channel) {
+ public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
"Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
- return new ScanNode(desc);
+ return new ScanNode(plan.newPID(), desc);
}
public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext> {
@@ -183,7 +183,7 @@ public class GlobalPlanner {
execBlock.setPlan(g1);
dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
- ScanNode scanNode = buildInputExecutor(dataChannel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
secondGroupBy.setChild(scanNode);
masterPlan.addConnect(dataChannel);
}
@@ -194,7 +194,7 @@ public class GlobalPlanner {
execBlock.setPlan(g1);
dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
- ScanNode scanNode = buildInputExecutor(dataChannel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
secondGroupBy.setChild(scanNode);
masterPlan.addConnect(dataChannel);
}
@@ -224,7 +224,7 @@ public class GlobalPlanner {
channel.setSchema(firstGroupBy.getOutSchema());
GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(channel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
secondGroupBy.setChild(scanNode);
LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
@@ -248,7 +248,7 @@ public class GlobalPlanner {
channel.setSchema(childNode.getOutSchema());
SortNode secondSort = PlannerUtil.clone(lastDistNode);
- ScanNode secondScan = buildInputExecutor(channel);
+ ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
secondSort.setChild(secondScan);
LimitNode limitAndSort;
@@ -296,8 +296,8 @@ public class GlobalPlanner {
DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
- ScanNode leftScan = buildInputExecutor(leftChannel);
- ScanNode rightScan = buildInputExecutor(rightChannel);
+ ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
+ ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
joinNode.setLeftChild(leftScan);
joinNode.setRightChild(rightScan);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
deleted file mode 100644
index 899925c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlannerUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-
-public class GlobalPlannerUtils {
- private static Log LOG = LogFactory.getLog(GlobalPlannerUtils.class);
-
- public static ScanNode newScanPlan(Schema inputSchema,
- String inputTableId,
- Path inputPath) {
- TableMeta meta = CatalogUtil.newTableMeta(inputSchema, StoreType.CSV);
- TableDesc desc = CatalogUtil.newTableDesc(inputTableId, meta, inputPath);
- ScanNode newScan = new ScanNode(desc);
- newScan.setInSchema(desc.getMeta().getSchema());
- newScan.setOutSchema(desc.getMeta().getSchema());
- return newScan;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
index 59ba129..62bcf2d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -25,6 +25,7 @@ import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.*;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.logical.IndexScanNode;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.Fragment;
@@ -55,7 +56,7 @@ public class IndexUtil {
return builder.toString();
}
- public static IndexScanNode indexEval( ScanNode scanNode,
+ public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
Iterator<Entry<String, String>> iter ) {
EvalNode qual = scanNode.getQual();
@@ -112,7 +113,7 @@ public class IndexUtil {
datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
}
- return new IndexScanNode(scanNode, keySchema , datum , maxIndex);
+ return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
deleted file mode 100644
index cbfabf8..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestReceiveNode.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-import com.google.common.collect.Lists;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestReceiveNode {
- @Test
- public final void testReceiveNode() throws CloneNotSupportedException {
- ReceiveNode rec = new ReceiveNode(PipeType.PULL, RepartitionType.HASH);
-
- URI uri1 = URI.create("http://192.168.0.1:2190/?part=0");
- URI uri2 = URI.create("http://192.168.0.2:2190/?part=1");
- URI uri3 = URI.create("http://192.168.0.3:2190/?part=2");
- URI uri4 = URI.create("http://192.168.0.4:2190/?part=3");
- List<URI> set1 = Lists.newArrayList(uri1, uri2);
- List<URI> set2 = Lists.newArrayList(uri3, uri4);
-
- rec.addData("test1", set1.get(0));
- rec.addData("test1", set1.get(1));
- rec.addData("test2", set2.get(0));
- rec.addData("test2", set2.get(1));
-
- assertEquals(NodeType.RECEIVE, rec.getType());
- assertEquals(PipeType.PULL, rec.getPipeType());
- assertEquals(RepartitionType.HASH, rec.getRepartitionType());
- assertEquals(set1, Lists.newArrayList(rec.getSrcURIs("test1")));
- assertEquals(set2, Lists.newArrayList(rec.getSrcURIs("test2")));
-
- ReceiveNode rec2 = (ReceiveNode) rec.clone();
- assertEquals(NodeType.RECEIVE, rec2.getType());
- assertEquals(PipeType.PULL, rec2.getPipeType());
- assertEquals(RepartitionType.HASH, rec2.getRepartitionType());
- assertEquals(set1, Lists.newArrayList(rec2.getSrcURIs("test1")));
- assertEquals(set2, Lists.newArrayList(rec2.getSrcURIs("test2")));
-
- assertEquals(rec, rec2);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fc018de8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
deleted file mode 100644
index 66197de..0000000
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/logical/TestSendNode.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.logical;
-
-import org.junit.Test;
-
-import java.net.URI;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestSendNode {
- @Test
- public final void testSendNode() throws CloneNotSupportedException {
- SendNode send = new SendNode(PipeType.PULL, RepartitionType.HASH);
- send.putDestURI(0, URI.create("http://localhost:2190"));
- send.putDestURI(1, URI.create("http://localhost:2191"));
-
- assertEquals(NodeType.SEND, send.getType());
- assertEquals(PipeType.PULL, send.getPipeType());
- assertEquals(RepartitionType.HASH, send.getRepartitionType());
- assertEquals(URI.create("http://localhost:2190"), send.getDestURI(0));
- assertEquals(URI.create("http://localhost:2191"), send.getDestURI(1));
-
- SendNode send2 = (SendNode) send.clone();
- assertEquals(NodeType.SEND, send2.getType());
- assertEquals(PipeType.PULL, send2.getPipeType());
- assertEquals(RepartitionType.HASH, send2.getRepartitionType());
- assertEquals(URI.create("http://localhost:2190"), send2.getDestURI(0));
- assertEquals(URI.create("http://localhost:2191"), send2.getDestURI(1));
-
- assertEquals(send, send2);
- }
-}