You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/26 10:14:33 UTC
[1/4] TAJO-266: Extend ExecutionBlock and Task to support multiple
outputs. (jihoon)
Updated Branches:
refs/heads/DAG-execplan [created] 7c97735e1
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b57ef3a..76d54e0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,9 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -40,11 +42,13 @@ import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
@@ -58,9 +62,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
@@ -192,9 +194,11 @@ public class TestPhysicalPlanner {
LogicalNode rootNode =plan.getRootBlock().getRoot();
optimizer.optimize(plan);
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
int i = 0;
@@ -221,9 +225,11 @@ public class TestPhysicalPlanner {
LogicalNode rootNode =plan.getRootBlock().getRoot();
optimizer.optimize(plan);
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
int i = 0;
@@ -249,8 +255,11 @@ public class TestPhysicalPlanner {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int i = 0;
Tuple tuple;
@@ -279,8 +288,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int i = 0;
Tuple tuple;
@@ -308,8 +320,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan.getRootBlock().getRoot());
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
/*HashAggregateExec hashAgg = (HashAggregateExec) exec;
@@ -371,10 +386,13 @@ public class TestPhysicalPlanner {
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
@@ -411,10 +429,13 @@ public class TestPhysicalPlanner {
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
@@ -445,24 +466,29 @@ public class TestPhysicalPlanner {
ctx.setEnforcer(new Enforcer());
Expr context = analyzer.parse(QUERIES[7]);
LogicalPlan plan = planner.createPlan(context);
+ LogicalNode rootNode = optimizer.optimize(plan);
int numPartitions = 3;
Column key1 = new Column("score.deptName", Type.TEXT);
Column key2 = new Column("score.class", Type.TEXT);
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.HASH_PARTITION, numPartitions);
+ ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+ dataChannel.setSchema(rootNode.getOutSchema());
dataChannel.setPartitionKey(new Column[]{key1, key2});
- ctx.setDataChannel(dataChannel);
- LogicalNode rootNode = optimizer.optimize(plan);
+ List<DataChannel> channels = new ArrayList<DataChannel>();
+ channels.add(dataChannel);
+ ctx.setOutgoingChannels(channels);
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
FileSystem fs = sm.getFileSystem();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Path path = new Path(workDir, "output");
@@ -505,20 +531,24 @@ public class TestPhysicalPlanner {
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[14]);
LogicalPlan plan = planner.createPlan(expr);
- LogicalNode rootNode = plan.getRootBlock().getRoot();
+ LogicalNode rootNode = optimizer.optimize(plan);
int numPartitions = 1;
DataChannel dataChannel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.HASH_PARTITION, numPartitions);
+ ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.HASH_PARTITION, numPartitions);
+ dataChannel.setSchema(rootNode.getOutSchema());
dataChannel.setPartitionKey(new Column[]{});
- ctx.setDataChannel(dataChannel);
- optimizer.optimize(plan);
+ List<DataChannel> channels = new ArrayList<DataChannel>();
+ channels.add(dataChannel);
+ ctx.setOutgoingChannels(channels);
TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Path path = new Path(workDir, "output");
@@ -571,8 +601,11 @@ public class TestPhysicalPlanner {
}
}
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
Tuple tuple = exec.next();
@@ -606,8 +639,11 @@ public class TestPhysicalPlanner {
}
}
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
Tuple tuple = exec.next();
assertEquals(30, tuple.get(0).asInt8());
@@ -627,8 +663,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int count = 0;
exec.init();
@@ -650,11 +689,14 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
LogicalRootNode root = (LogicalRootNode) rootNode;
- UnionNode union = new UnionNode(plan.newPID(), root.getChild(), root.getChild());
+ UnionNode union = new UnionNode(plan.newPID(), root.getChild(), clonePlan(plan, root.getChild()));
root.setChild(union);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, root);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(root);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int count = 0;
exec.init();
@@ -665,6 +707,31 @@ public class TestPhysicalPlanner {
assertEquals(200, count);
}
+ private LogicalNode clonePlan(LogicalPlan plan, LogicalNode node) {
+ try {
+ LogicalNode clone = (LogicalNode) node.clone();
+ Stack<LogicalNode> stack = new Stack<LogicalNode>();
+ stack.push(clone);
+ LogicalNode current;
+
+ while (!stack.isEmpty()) {
+ current = stack.pop();
+ current.setPid(plan.newPID());
+ if (current instanceof UnaryNode) {
+ stack.push(((UnaryNode) current).getChild());
+ } else if (current instanceof BinaryNode) {
+ stack.push(((BinaryNode) current).getLeftChild());
+ stack.push(((BinaryNode) current).getRightChild());
+ }
+ }
+
+ return clone;
+ } catch (CloneNotSupportedException e) {
+
+ }
+ return null;
+ }
+
@Test
public final void testEvalExpr() throws IOException, PlanningException {
Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr");
@@ -674,8 +741,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
exec.init();
tuple = exec.next();
@@ -687,8 +757,11 @@ public class TestPhysicalPlanner {
plan = planner.createPlan(expr);
rootNode = optimizer.optimize(plan);
+ execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf, sm);
- exec = phyPlanner.createPlan(ctx, rootNode);
+ exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
tuple = exec.next();
exec.close();
@@ -710,8 +783,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
while (exec.next() != null) {
}
@@ -738,8 +814,11 @@ public class TestPhysicalPlanner {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
int cnt = 0;
@@ -773,16 +852,21 @@ public class TestPhysicalPlanner {
SortNode sortNode = PlannerUtil.findTopNode(rootNode, NodeType.SORT);
DataChannel channel = new DataChannel(masterPlan.newExecutionBlockId(), masterPlan.newExecutionBlockId(),
- PartitionType.RANGE_PARTITION);
+ ((LogicalRootNode)rootNode).getChild().getPID(), null, PartitionType.RANGE_PARTITION);
+ channel.setSchema(rootNode.getOutSchema());
channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
- ctx.setDataChannel(channel);
+ List<DataChannel> channels = new ArrayList<DataChannel>();
+ channels.add(channel);
+ ctx.setOutgoingChannels(channels);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlan(ctx, execPlan);
Tuple tuple;
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Schema keySchema = new Schema();
@@ -866,8 +950,11 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
exec.next();
exec.close();
@@ -887,8 +974,11 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
+ execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
- exec = phyPlanner.createPlan(ctx, rootNode);
+ exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
exec.next();
exec.close();
@@ -914,8 +1004,11 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
exec.next();
exec.close();
@@ -935,8 +1028,11 @@ public class TestPhysicalPlanner {
new FileFragment[] {frags[0]}, workDir);
ctx.setEnforcer(enforcer);
+ execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
phyPlanner = new PhysicalPlannerImpl(conf,sm);
- exec = phyPlanner.createPlan(ctx, rootNode);
+ exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
exec.init();
exec.next();
exec.close();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index d582e2b..42a0cb7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -19,9 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -31,22 +30,22 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
// this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order
public class TestRightOuterHashJoinExec {
@@ -232,10 +231,12 @@ public class TestRightOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[0]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof RightOuterMergeJoinExec) {
//for this small data set this is not likely to happen
@@ -245,13 +246,13 @@ public class TestRightOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(12, count);
}
}
@@ -272,10 +273,12 @@ public class TestRightOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof RightOuterMergeJoinExec) {
//for this small data set this is not likely to happen
@@ -285,13 +288,13 @@ public class TestRightOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(5, count);
}
}
@@ -312,10 +315,12 @@ public class TestRightOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[2]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof RightOuterMergeJoinExec) {
//for this small data set this is not likely to happen
@@ -325,13 +330,13 @@ public class TestRightOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(7, count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 5bbb4aa..948680d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,12 +31,15 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -46,9 +47,7 @@ import org.junit.Test;
import java.io.IOException;
import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
public class TestRightOuterMergeJoinExec {
private TajoConf conf;
@@ -309,20 +308,22 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
- while (exec.next() != null) {
+ proj.init();
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- assertNull(exec.next());
- exec.close();
+ assertNull(proj.next());
+ proj.close();
assertEquals(12, count);
}
@@ -344,20 +345,22 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
- while (exec.next() != null) {
+ proj.init();
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- assertNull(exec.next());
- exec.close();
+ assertNull(proj.next());
+ proj.close();
assertEquals(5, count);
}
@@ -378,19 +381,21 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
- while (exec.next() != null) {
+ proj.init();
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- assertNull(exec.next());
- exec.close();
+ assertNull(proj.next());
+ proj.close();
assertEquals(7, count);
}
@@ -412,21 +417,23 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
+ proj.init();
- while (exec.next() != null) {
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- assertNull(exec.next());
- exec.close();
+ assertNull(proj.next());
+ proj.close();
assertEquals(13, count);
}
@@ -448,20 +455,22 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
+ proj.init();
- while (exec.next() != null) {
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- assertNull(exec.next());
- exec.close();
+ assertNull(proj.next());
+ proj.close();
assertEquals(0, count);
}
@@ -484,19 +493,21 @@ public class TestRightOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
int count = 0;
- exec.init();
+ proj.init();
- while (exec.next() != null) {
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(7, count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 45badd5..77b93de 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,9 +31,12 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -118,8 +119,11 @@ public class TestSortExec {
LogicalPlan plan = planner.createPlan(context);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
Datum preVal = null;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 6934872..39408a0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -34,6 +34,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.storage.*;
@@ -105,7 +106,8 @@ public class TestRangeRetrieverHandler {
Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
fs.mkdirs(tableDir.getParent());
- Appender appender = sm.getAppender(employeeMeta, schema, tableDir);
+ Appender appender =
+ sm.getAppender(employeeMeta, schema, tableDir);
appender.init();
Tuple tuple = new VTuple(schema.getColumnNum());
@@ -137,10 +139,12 @@ public class TestRangeRetrieverHandler {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
MemSortExec sort = (MemSortExec) proj.getChild();
SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
@@ -149,7 +153,7 @@ public class TestRangeRetrieverHandler {
exec = idxStoreExec;
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
@@ -250,10 +254,12 @@ public class TestRangeRetrieverHandler {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
MemSortExec sort = (MemSortExec) proj.getChild();
SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
IndexedStoreExec idxStoreExec = new IndexedStoreExec(ctx, sm, sort,
@@ -261,7 +267,7 @@ public class TestRangeRetrieverHandler {
exec = idxStoreExec;
exec.init();
- exec.next();
+ while (exec.next() != null);
exec.close();
Schema keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
[2/4] TAJO-266: Extend ExecutionBlock and Task to support multiple
outputs. (jihoon)
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 82d2be4..94d6863 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -32,9 +32,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.RangePartitionAlgorithm;
import org.apache.tajo.engine.planner.UniformRangePartition;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.*;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.utils.TupleUtil;
import org.apache.tajo.exception.InternalException;
@@ -72,7 +70,9 @@ public class Repartitioner {
QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
AbstractStorageManager storageManager = subQuery.getStorageManager();
- ScanNode[] scans = execBlock.getScanNodes();
+ InputContext srcContext = execBlock.getInputContext();
+ Preconditions.checkState(srcContext.size() == 2);
+ ScanNode[] scans = srcContext.getScanNodes();
Path tablePath;
FileFragment[] fragments = new FileFragment[2];
@@ -109,9 +109,9 @@ public class Repartitioner {
tasks[0] = new QueryUnit(subQuery.getContext().getConf(),
QueryIdFactory.newQueryUnitId(subQuery.getId(), 0),
subQuery.getMasterPlan().isLeaf(execBlock), subQuery.getEventHandler());
- tasks[0].setLogicalPlan(execBlock.getPlan());
- tasks[0].setFragment(scans[0].getCanonicalName(), fragments[0]);
- tasks[0].setFragment(scans[1].getCanonicalName(), fragments[1]);
+ tasks[0].setExecutionPlan(execBlock.getPlan());
+ tasks[0].setFragment2(fragments[0]);
+ tasks[0].setFragment2(fragments[1]);
} else if (leftSmall ^ rightSmall) {
LOG.info("[Distributed Join Strategy] : Broadcast Join");
int broadcastIdx = leftSmall ? 0 : 1;
@@ -201,8 +201,11 @@ public class Repartitioner {
private static QueryUnit [] createLeafTasksWithBroadcastTable(SubQuery subQuery, int baseScanId, FileFragment broadcasted) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- Preconditions.checkArgument(scans.length == 2, "Must be Join Query");
+ MasterPlan masterPlan = subQuery.getMasterPlan();
+ InputContext srcContext = execBlock.getInputContext();
+ Preconditions.checkArgument(srcContext.size() == 2, "Must be Join Query");
+
+ ScanNode[] scans = srcContext.getScanNodes();
TableMeta meta;
Path inputPath;
ScanNode scan = scans[baseScanId];
@@ -230,7 +233,7 @@ public class Repartitioner {
QueryUnit unit = new QueryUnit(subQuery.getContext().getConf(),
QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.getMasterPlan().isLeaf(execBlock),
subQuery.getEventHandler());
- unit.setLogicalPlan(execBlock.getPlan());
+ unit.setExecutionPlan(execBlock.getPlan());
unit.setFragment2(fragment);
return unit;
}
@@ -242,7 +245,7 @@ public class Repartitioner {
tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
QueryIdFactory.newQueryUnitId(subQuery.getId(), i), subQuery.getMasterPlan().isLeaf(execBlock),
subQuery.getEventHandler());
- tasks[i].setLogicalPlan(execBlock.getPlan());
+ tasks[i].setExecutionPlan(execBlock.getPlan());
for (FileFragment fragment : fragments) {
tasks[i].setFragment2(fragment);
}
@@ -315,7 +318,10 @@ public class Repartitioner {
return new QueryUnit[0];
}
- ScanNode scan = execBlock.getScanNodes()[0];
+ InputContext srcContext = execBlock.getInputContext();
+ // TODO: maybe we need to check the number of input sources
+// Preconditions.checkState(srcContext.size() == 1);
+ ScanNode scan = srcContext.getScanNodes()[0];
Path tablePath;
tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
@@ -434,14 +440,13 @@ public class Repartitioner {
return new QueryUnit[0];
}
- ScanNode scan = execBlock.getScanNodes()[0];
+ InputContext srcContext = execBlock.getInputContext();
+ ScanNode scan = srcContext.getScanNodes()[0];
Path tablePath;
tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
-
FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0);
-
Map<String, List<IntermediateEntry>> hashedByHost;
Map<Integer, List<URI>> finalFetchURI = new HashMap<Integer, List<URI>>();
@@ -565,14 +570,14 @@ public class Repartitioner {
public static QueryUnit [] createEmptyNonLeafTasks(SubQuery subQuery, int num,
FileFragment frag) {
- LogicalNode plan = subQuery.getBlock().getPlan();
+ ExecutionPlan plan = subQuery.getBlock().getPlan();
QueryUnit [] tasks = new QueryUnit[num];
for (int i = 0; i < num; i++) {
tasks[i] = new QueryUnit(subQuery.getContext().getConf(),
QueryIdFactory.newQueryUnitId(subQuery.getId(), i),
false, subQuery.getEventHandler());
tasks[i].setFragment2(frag);
- tasks[i].setLogicalPlan(plan);
+ tasks[i].setExecutionPlan(plan);
}
return tasks;
}
@@ -602,23 +607,26 @@ public class Repartitioner {
// TODO: the union handling is required when a join has unions as its child
MasterPlan masterPlan = subQuery.getMasterPlan();
keys = channel.getPartitionKey();
+ LogicalNode topnode = null;
if (!masterPlan.isRoot(subQuery.getBlock()) ) {
ExecutionBlock parentBlock = masterPlan.getParent(subQuery.getBlock());
- if (parentBlock.getPlan().getType() == NodeType.JOIN) {
+ topnode = parentBlock.getPlan().getChild(parentBlock.getPlan().getTerminalNode(), 0);
+ if (topnode.getType() == NodeType.JOIN) {
channel.setPartitionNum(desiredNum);
}
}
+ topnode = execBlock.getPlan().getChild(execBlock.getPlan().getTerminalNode(), 0);
// set the partition number for group by and sort
if (channel.getPartitionType() == HASH_PARTITION) {
- if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
- GroupbyNode groupby = (GroupbyNode) execBlock.getPlan();
+ if (topnode.getType() == NodeType.GROUP_BY) {
+ GroupbyNode groupby = (GroupbyNode) topnode;
keys = groupby.getGroupingColumns();
}
} else if (channel.getPartitionType() == RANGE_PARTITION) {
- if (execBlock.getPlan().getType() == NodeType.SORT) {
- SortNode sort = (SortNode) execBlock.getPlan();
+ if (topnode.getType() == NodeType.SORT) {
+ SortNode sort = (SortNode) topnode;
keys = new Column[sort.getSortKeys().length];
for (int i = 0; i < keys.length; i++) {
keys[i] = sort.getSortKeys()[i].getSortKey();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 4aa3866..c5efb4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -169,8 +169,12 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return masterPlan;
}
- public DataChannel getDataChannel() {
- return masterPlan.getOutgoingChannels(getId()).iterator().next();
+ public List<DataChannel> getIncomingChannels() {
+ return masterPlan.getIncomingChannels(getId());
+ }
+
+ public List<DataChannel> getOutgoingChannels() {
+ return masterPlan.getOutgoingChannels(getId());
}
public EventHandler getEventHandler() {
@@ -495,7 +499,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
// Is this subquery the first step of join?
- if (parent != null && parent.getScanNodes().length == 2) {
+ if (parent != null && masterPlan.getChildCount(parent.getId()) == 2) {
List<ExecutionBlock> childs = masterPlan.getChilds(parent);
// for inner
@@ -551,10 +555,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
QueryUnit [] tasks;
- if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan
+ if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) &&
+ execBlock.getInputContext().size() == 1) {
tasks = createLeafTasks(subQuery);
- } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join
+ } else if (execBlock.getInputContext().size() > 1) { // Case 2: Join
tasks = Repartitioner.createJoinTasks(subQuery);
} else { // Case 3: Others (Sort or Aggregation)
@@ -593,7 +598,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) {
Map<String, TableDesc> tableMap = context.getTableDescMap();
if (masterPlan.isLeaf(execBlock)) {
- ScanNode outerScan = execBlock.getScanNodes()[0];
+ ScanNode outerScan = execBlock.getInputContext().getScanNodes()[0];
TableStats stat = tableMap.get(outerScan.getCanonicalName()).getStats();
return stat.getNumBytes();
} else {
@@ -632,19 +637,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
- ScanNode[] scans = execBlock.getScanNodes();
- Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
TableMeta meta;
Path inputPath;
- ScanNode scan = scans[0];
- TableDesc desc = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+ Preconditions.checkState(execBlock.getInputContext().size() == 1);
+ ScanNode scanNode = execBlock.getInputContext().getScanNodes()[0];
+ TableDesc desc = subQuery.context.getTableDescMap().get(scanNode.getCanonicalName());
inputPath = desc.getPath();
meta = desc.getMeta();
// TODO - should be change the inner directory
- List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
- inputPath);
+ List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scanNode.getCanonicalName(),
+ meta, desc.getSchema(), inputPath);
QueryUnit queryUnit;
List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();
@@ -663,7 +667,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
QueryUnit unit = new QueryUnit(subQuery.context.getConf(),
QueryIdFactory.newQueryUnitId(subQuery.getId(), taskId), subQuery.masterPlan.isLeaf(execBlock),
subQuery.eventHandler);
- unit.setLogicalPlan(execBlock.getPlan());
+ unit.setExecutionPlan(execBlock.getPlan());
unit.setFragment2(fragment);
return unit;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 1731854..e22f9d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -19,9 +19,8 @@
package org.apache.tajo.worker;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.exception.InternalException;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -32,14 +31,14 @@ import java.io.IOException;
public class TajoQueryEngine {
private final AbstractStorageManager storageManager;
- private final PhysicalPlanner phyPlanner;
+ private final PhysicalPlannerImpl phyPlanner;
public TajoQueryEngine(TajoConf conf) throws IOException {
this.storageManager = StorageManagerFactory.getStorageManager(conf);
this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
}
- public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
+ public PhysicalExec createPlan(TaskAttemptContext ctx, ExecutionPlan plan)
throws InternalException {
return phyPlanner.createPlan(ctx, plan);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index f931615..7c2a140 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -39,7 +39,12 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.InputContext;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.engine.query.QueryUnitRequest;
@@ -76,7 +81,7 @@ public class Task {
private final QueryUnitRequest request;
private final TaskAttemptContext context;
private List<Fetcher> fetcherRunners;
- private final LogicalNode plan;
+ private final ExecutionPlan plan;
private final Map<String, TableDesc> descs = Maps.newHashMap();
private PhysicalExec executor;
private boolean interQuery;
@@ -99,9 +104,13 @@ public class Task {
private AtomicBoolean progressFlag = new AtomicBoolean(false);
// TODO - to be refactored
- private PartitionType partitionType = null;
- private Schema finalSchema = null;
- private TupleComparator sortComp = null;
+ private static class TaskContext {
+ private PartitionType partitionType;
+ private Schema finalSchema;
+ private TupleComparator sortComp;
+ }
+
+ private List<TaskContext> taskContexts = new ArrayList<TaskContext>();
static final String OUTPUT_FILE_PREFIX="part-";
static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -145,25 +154,31 @@ public class Task {
this.context = new TaskAttemptContext(systemConf, taskId,
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
- this.context.setDataChannel(request.getDataChannel());
+ this.context.setIncomingChannels(request.getIncomingChannels());
+ this.context.setOutgoingChannels(request.getOutgoingChannels());
this.context.setEnforcer(request.getEnforcer());
- plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
- LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
- for (LogicalNode node : scanNode) {
- ScanNode scan = (ScanNode)node;
+ plan = CoreGsonHelper.fromJson(request.getSerializedData(), ExecutionPlan.class);
+ // TODO: add meta information to regenerate a table desc
+ InputContext srcContext = plan.getInputContext();
+
+ for (ScanNode scan : srcContext.getScanNodes()) {
descs.put(scan.getCanonicalName(), scan.getTableDesc());
}
interQuery = request.getProto().getInterQuery();
if (interQuery) {
context.setInterQuery();
- this.partitionType = context.getDataChannel().getPartitionType();
-
- if (partitionType == PartitionType.RANGE_PARTITION) {
- SortNode sortNode = (SortNode) PlannerUtil.findTopNode(plan, NodeType.SORT);
- this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
- this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
+ for (DataChannel outChannel : request.getOutgoingChannels()) {
+ TaskContext taskContext = new TaskContext();
+ taskContext.partitionType = outChannel.getPartitionType();
+ if (taskContext.partitionType == PartitionType.RANGE_PARTITION) {
+ SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
+ Preconditions.checkArgument(sortNode != null);
+ taskContext.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
+ taskContext.sortComp = new TupleComparator(taskContext.finalSchema, sortNode.getSortKeys());
+ }
+ taskContexts.add(taskContext);
}
} else {
// The final result of a task will be written in a file named part-ss-nnnnnnn,
@@ -179,8 +194,12 @@ public class Task {
context.setState(TaskAttemptState.TA_PENDING);
LOG.info("==================================");
LOG.info("* Subquery " + request.getId() + " is initialized");
- LOG.info("* InterQuery: " + interQuery
- + (interQuery ? ", Use " + this.partitionType + " partitioning":""));
+ LOG.info("* InterQuery: " + interQuery);
+ if (interQuery) {
+ for (TaskContext taskContext : taskContexts) {
+ LOG.info(", Use " + taskContext.partitionType + " partitioning\":\"");
+ }
+ }
LOG.info("* Fragments (num: " + request.getFragments().size() + ")");
LOG.info("* Fetches (total:" + request.getFetches().size() + ") :");
@@ -281,7 +300,7 @@ public class Task {
if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
try {
- // context.getWorkDir() 지우기
+ // delete context.getWorkDir()
localFS.delete(context.getWorkDir(), true);
synchronized (taskRunnerContext.getTasks()) {
taskRunnerContext.getTasks().remove(this.getId());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 86d8cf2..bc66a62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -61,7 +61,8 @@ public class TaskAttemptContext {
private boolean stopped = false;
private boolean interQuery = false;
private Path outputPath;
- private DataChannel dataChannel;
+ private List<DataChannel> incomingChannels;
+ private List<DataChannel> outgoingChannels;
private Enforcer enforcer;
public TaskAttemptContext(TajoConf conf, final QueryUnitAttemptId queryId,
@@ -105,12 +106,20 @@ public class TaskAttemptContext {
LOG.info("Query status of " + getTaskId() + " is changed to " + state);
}
- public void setDataChannel(DataChannel dataChannel) {
- this.dataChannel = dataChannel;
+ public void setOutgoingChannels(List<DataChannel> outgoingChannels) {
+ this.outgoingChannels = outgoingChannels;
}
- public DataChannel getDataChannel() {
- return dataChannel;
+ public void setIncomingChannels(List<DataChannel> incomingChannels) {
+ this.incomingChannels = incomingChannels;
+ }
+
+ public List<DataChannel> getOutgoingChannels() {
+ return outgoingChannels;
+ }
+
+ public List<DataChannel> getIncomingChannels() {
+ return incomingChannels;
}
public void setEnforcer(Enforcer enforcer) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 987af25..5479e01 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -60,8 +60,9 @@ message QueryUnitRequestProto {
repeated Fetch fetches = 7;
optional bool shouldDie = 8;
optional KeyValueSetProto queryContext = 9;
- optional DataChannelProto dataChannel = 10;
- optional EnforcerProto enforcer = 11;
+ repeated DataChannelProto incomingChannels = 10;
+ repeated DataChannelProto outgoingChannels = 11;
+ optional EnforcerProto enforcer = 12;
}
message Fetch {
@@ -143,6 +144,9 @@ message DataChannelProto {
optional int32 partitionNum = 9 [default = 1];
optional StoreType storeType = 10 [default = CSV];
+
+ optional int32 srcPID = 11;
+ optional int32 targetPID = 12;
}
message RunExecutionBlockRequestProto {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 025c84b..4d19d75 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -96,8 +96,8 @@ public class TestLogicalPlan {
SimpleDirectedGraph<String, LogicalPlan.BlockEdge> graph = plan.getQueryBlockGraph();
assertEquals(2, graph.getChildCount(root.getName()));
- assertEquals(root.getName(), graph.getParent(new1.getName()));
- assertEquals(root.getName(), graph.getParent(new2.getName()));
+ assertEquals(root.getName(), graph.getParent(new1.getName(), 0));
+ assertEquals(root.getName(), graph.getParent(new2.getName(), 0));
assertTrue(graph.isRoot(root.getName()));
assertFalse(graph.isRoot(new1.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
new file mode 100644
index 0000000..e908863
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestExecutionPlan.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.logical.*;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestExecutionPlan {
+
+ @Test
+ public void testJson() {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ schema.addColumn("age", Type.INT2);
+
+ GroupbyNode groupbyNode = new GroupbyNode(3, new Column[]{schema.getColumn(1), schema.getColumn(2)});
+ ScanNode scanNode = new ScanNode(0,
+ CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
+
+ groupbyNode.setChild(scanNode);
+
+ ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(4));
+ plan.addPlan(groupbyNode);
+
+ String json = plan.toJson();
+ ExecutionPlan fromJson = CoreGsonHelper.fromJson(json, ExecutionPlan.class);
+ assertEquals(plan, fromJson);
+ }
+
+ @Test
+ public void testAddPlan() {
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ schema.addColumn("age", Type.INT2);
+
+ LogicalRootNode root1 = new LogicalRootNode(10);
+ GroupbyNode groupbyNode = new GroupbyNode(3, new Column[]{schema.getColumn(1), schema.getColumn(2)});
+ ScanNode scanNode = new ScanNode(0,
+ CatalogUtil.newTableDesc("in", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in")));
+ root1.setChild(groupbyNode);
+ groupbyNode.setChild(scanNode);
+
+ LogicalRootNode root2 = new LogicalRootNode(11);
+ SortNode sortNode = new SortNode(2, new SortSpec[]{new SortSpec(schema.getColumn(2))});
+ root2.setChild(sortNode);
+ sortNode.setChild(scanNode);
+
+ LogicalRootNode root3 = new LogicalRootNode(12);
+ JoinNode joinNode = new JoinNode(4);
+ ScanNode scanNode2 = new ScanNode(1,
+ CatalogUtil.newTableDesc("in2", schema, CatalogUtil.newTableMeta(StoreType.CSV), new Path("in2")));
+ root3.setChild(joinNode);
+ joinNode.setLeftChild(scanNode);
+ joinNode.setRightChild(scanNode2);
+
+ ExecutionPlan plan = new ExecutionPlan(new LogicalRootNode(5));
+ plan.addPlan(root1);
+ plan.addPlan(root2);
+ assertEquals(1, plan.getInputContext().size());
+ assertEquals(1, plan.getChildCount(groupbyNode));
+ assertEquals(1, plan.getChildCount(sortNode));
+ assertEquals(plan.getChild(groupbyNode, 0), plan.getChild(sortNode, 0));
+
+ plan.addPlan(root3);
+ assertEquals(2, plan.getInputContext().size());
+ assertEquals(3, plan.getParentCount(scanNode));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
index 960ee57..24f81dd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java
@@ -31,9 +31,9 @@ public class TestMasterPlan {
public void testConnect() {
MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
- ExecutionBlock eb1 = masterPlan.newExecutionBlock();
- ExecutionBlock eb2 = masterPlan.newExecutionBlock();
- ExecutionBlock eb3 = masterPlan.newExecutionBlock();
+ ExecutionBlock eb1 = masterPlan.newExecutionBlockForTest();
+ ExecutionBlock eb2 = masterPlan.newExecutionBlockForTest();
+ ExecutionBlock eb3 = masterPlan.newExecutionBlockForTest();
masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.PartitionType.LIST_PARTITION);
assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 5f0457e..d816bb1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,12 +31,15 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -151,11 +152,14 @@ public class TestBNLJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof BNLJoinExec);
+ exec = proj;
int i = 0;
exec.init();
@@ -187,12 +191,14 @@ public class TestBNLJoinExec {
merged, workDir);
ctx.setEnforcer(enforcer);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
-
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof BNLJoinExec);
+ exec = proj;
Tuple tuple;
int i = 1;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index c9499d6..f2877a0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -23,9 +23,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -38,11 +35,15 @@ import org.apache.tajo.engine.planner.LogicalOptimizer;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.LogicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -168,7 +169,10 @@ public class TestBSTIndexExec {
LogicalNode rootNode = optimizer.optimize(plan);
TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int tupleCount = this.randomValues.get(rndKey);
int counter = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index e236126..6aa0eef 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,9 +31,12 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -120,10 +121,13 @@ public class TestExternalSortExec {
LogicalPlan plan = planner.createPlan(expr);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
+ exec = proj;
// TODO - should be planed with user's optimization hint
if (!(proj.getChild() instanceof ExternalSortExec)) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index 63f14b7..e6aaddf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
@@ -263,11 +264,14 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -300,11 +304,14 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -336,11 +343,14 @@ public class TestFullOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -374,11 +384,14 @@ public class TestFullOuterHashJoinExec {
workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index c92d1c9..f474979 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
@@ -306,11 +307,14 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -341,11 +345,14 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -376,12 +383,14 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
-
+ exec = proj;
int count = 0;
exec.init();
@@ -412,13 +421,16 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
// if it chose the hash join WITH REVERSED ORDER, convert to merge join exec
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -451,11 +463,14 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
@@ -488,11 +503,14 @@ public class TestFullOuterMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec);
+ exec = proj;
int count = 0;
exec.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 57f2376..79aa0f9 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -33,10 +31,13 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -157,8 +158,11 @@ public class TestHashAntiJoinExec {
optimizer.optimize(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
// replace an equal join with an hash anti join.
if (exec instanceof MergeJoinExec) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 2a1af7c..68e5ef2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
@@ -150,11 +151,14 @@ public class TestHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashJoinExec);
+ exec = proj;
Tuple tuple;
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index d03f3c2..cc177d5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
@@ -162,8 +163,11 @@ public class TestHashSemiJoinExec {
System.out.println(plan);
LogicalNode rootNode = plan.getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(rootNode);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
// replace an equal join with an hash anti join.
if (exec instanceof MergeJoinExec) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index 4ac423b..8082b07 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -19,9 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -32,16 +31,18 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
import java.io.IOException;
@@ -264,19 +265,21 @@ public class TestLeftOuterHashJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof HashLeftOuterJoinExec);
int count = 0;
- exec.init();
- while (exec.next() != null) {
+ proj.init();
+ while (proj.next() != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(12, count);
}
@@ -296,10 +299,12 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof NLLeftOuterJoinExec) {
//for this small data set this is not likely to happen
@@ -309,13 +314,13 @@ public class TestLeftOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(5, count);
}
}
@@ -336,10 +341,12 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[2]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof NLLeftOuterJoinExec) {
//for this small data set this is not likely to happen
@@ -349,13 +356,13 @@ public class TestLeftOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(7, count);
}
}
@@ -377,10 +384,12 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[3]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof NLLeftOuterJoinExec) {
//for this small data set this is not likely to happen
@@ -390,13 +399,13 @@ public class TestLeftOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(7, count);
}
}
@@ -418,10 +427,12 @@ public class TestLeftOuterHashJoinExec {
Expr expr = analyzer.parse(QUERIES[4]);
LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof NLLeftOuterJoinExec) {
//for this small data set this is not likely to happen
@@ -431,13 +442,13 @@ public class TestLeftOuterHashJoinExec {
Tuple tuple;
int count = 0;
int i = 1;
- exec.init();
+ proj.init();
- while ((tuple = exec.next()) != null) {
+ while ((tuple = proj.next()) != null) {
//TODO check contents
count = count + 1;
}
- exec.close();
+ proj.close();
assertEquals(0, count);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index ce31851..54a3c49 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -19,9 +19,8 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -31,23 +30,23 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.tajo.LocalTajoTestingUtility;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
public class TestLeftOuterNLJoinExec {
private TajoConf conf;
@@ -254,18 +253,19 @@ public class TestLeftOuterNLJoinExec {
Expr context = analyzer.parse(QUERIES[0]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof HashLeftOuterJoinExec) {
HashLeftOuterJoinExec join = proj.getChild();
NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
proj.setChild(aJoin);
- exec = proj;
}
+ exec = proj;
int count = 0;
exec.init();
@@ -295,20 +295,19 @@ public class TestLeftOuterNLJoinExec {
Expr context = analyzer.parse(QUERIES[1]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof HashLeftOuterJoinExec) {
HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
proj.setChild(aJoin);
- exec = proj;
-
}
-
+ exec = proj;
Tuple tuple;
int i = 1;
@@ -338,19 +337,20 @@ public class TestLeftOuterNLJoinExec {
Expr context = analyzer.parse(QUERIES[2]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof HashLeftOuterJoinExec) {
HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
proj.setChild(aJoin);
- exec = proj;
-
+
}
+ exec = proj;
Tuple tuple;
@@ -382,19 +382,20 @@ public class TestLeftOuterNLJoinExec {
Expr context = analyzer.parse(QUERIES[3]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof HashLeftOuterJoinExec) {
HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
proj.setChild(aJoin);
- exec = proj;
-
+
}
+ exec = proj;
Tuple tuple;
@@ -425,19 +426,20 @@ public class TestLeftOuterNLJoinExec {
Expr context = analyzer.parse(QUERIES[4]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
-
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
//maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
- ProjectionExec proj = (ProjectionExec) exec;
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
if (proj.getChild() instanceof HashLeftOuterJoinExec) {
HashLeftOuterJoinExec join = (HashLeftOuterJoinExec) proj.getChild();
NLLeftOuterJoinExec aJoin = new NLLeftOuterJoinExec(ctx, join.getPlan(), join.getLeftChild(), join.getRightChild());
proj.setChild(aJoin);
- exec = proj;
-
+
}
+ exec = proj;
Tuple tuple;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index e6dd0a5..57afef5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -34,12 +32,15 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.JoinNode;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -165,10 +166,13 @@ public class TestMergeJoinExec {
LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
ctx.setEnforcer(enforcer);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, root);
- ProjectionExec proj = (ProjectionExec) exec;
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(root);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf,sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ ProjectionExec proj = (ProjectionExec) ((PhysicalRootExec)exec).getChild(0);
assertTrue(proj.getChild() instanceof MergeJoinExec);
+ exec = proj;
Tuple tuple;
int count = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 50d431c..a5270a3 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -21,8 +21,6 @@ package org.apache.tajo.engine.planner.physical;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -32,15 +30,17 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlanner;
import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
import org.apache.tajo.engine.planner.PlanningException;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -153,8 +153,11 @@ public class TestNLJoinExec {
Expr context = analyzer.parse(QUERIES[0]);
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
int i = 0;
exec.init();
@@ -182,8 +185,11 @@ public class TestNLJoinExec {
LogicalNode plan = planner.createPlan(context).getRootBlock().getRoot();
//LogicalOptimizer.optimize(ctx, plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+ ExecutionPlan execPlan = new ExecutionPlan();
+ execPlan.addPlan(plan);
+ PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+ PhysicalExec exec = phyPlanner.createPlanWithoutMaterialize(ctx, execPlan);
+ exec = ((PhysicalRootExec)exec).getChild(0);
Tuple tuple;
int i = 1;
[3/4] TAJO-266: Extend ExecutionBlock and Task to support multiple
outputs. (jihoon)
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
new file mode 100644
index 0000000..1c02d70
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlan.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.json.GsonObject;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+/**
+ * ExecutionPlan is a DAG of logical nodes.
+ * If there are two input sources, the plan should include Join or Union.
+ * The terminalNode is used as the start position of the traversal, because there are multiple output destinations.
+ */
+public class ExecutionPlan implements GsonObject {
+ @Expose private InputContext inputContext;
+ @Expose private boolean hasUnionPlan;
+ @Expose private boolean hasJoinPlan;
+ @Expose private LogicalRootNode terminalNode;
+ @Expose private Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+ @Expose private SimpleDirectedGraph<Integer, ExecutionPlanEdge> graph
+ = new SimpleDirectedGraph<Integer, ExecutionPlanEdge>();
+
+ @VisibleForTesting
+ public ExecutionPlan() {
+
+ }
+
+ public ExecutionPlan(LogicalRootNode terminalNode) {
+ this.terminalNode = PlannerUtil.clone(terminalNode);
+ }
+
+ public void setPlan(LogicalNode plan) {
+ this.clear();
+ this.addPlan(plan);
+ }
+
+ private void clear() {
+ for (ExecutionPlanEdge edge : graph.getEdgesAll()) {
+ graph.removeEdge(edge.getChildId(), edge.getParentId());
+ }
+ vertices.clear();
+ this.inputContext = null;
+ this.hasUnionPlan = false;
+ this.hasJoinPlan = false;
+ }
+
+ public void addPlan(LogicalNode plan) {
+ LogicalNode current = PlannerUtil.clone(plan);
+ if (current.getType() == NodeType.ROOT) {
+ terminalNode = (LogicalRootNode) current;
+ } else {
+ this.add(current, terminalNode, Tag.SINGLE);
+ terminalNode.setChild(current);
+ }
+ ExecutionPlanBuilder builder = new ExecutionPlanBuilder(this);
+ builder.visit(terminalNode);
+ }
+
+ public void add(LogicalNode child, LogicalNode parent, Tag tag) {
+ vertices.put(child.getPID(), child);
+ vertices.put(parent.getPID(), parent);
+ graph.addEdge(child.getPID(), parent.getPID(), new ExecutionPlanEdge(child, parent, tag));
+ }
+
+ public void setInputContext(InputContext contexts) {
+ this.inputContext = contexts;
+ }
+
+ public boolean hasJoinPlan() {
+ return this.hasJoinPlan;
+ }
+
+ public boolean hasUnionPlan() {
+ return this.hasUnionPlan;
+ }
+
+ public LogicalRootNode getTerminalNode() {
+ return terminalNode;
+ }
+
+ public InputContext getInputContext() {
+ return inputContext;
+ }
+
+ public String toString() {
+ return graph.toStringGraph(terminalNode.getPID());
+ }
+
+ public Tag getTag(LogicalNode child, LogicalNode parent) {
+ return graph.getEdge(child.getPID(), parent.getPID()).getTag();
+ }
+
+ public LogicalNode getChild(LogicalNode parent, Tag tag) {
+ List<ExecutionPlanEdge> incomingEdges = graph.getIncomingEdges(parent.getPID());
+ for (ExecutionPlanEdge inEdge : incomingEdges) {
+ if (inEdge.getTag() == tag) {
+ return vertices.get(inEdge.getChildId());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, ExecutionPlan.class);
+ }
+
+ public Schema getOutSchema(int i) {
+ return vertices.get(graph.getChild(terminalNode.getPID(), i)).getOutSchema();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof ExecutionPlan) {
+ ExecutionPlan other = (ExecutionPlan) o;
+ boolean eq = this.hasJoinPlan == other.hasJoinPlan;
+ eq &= this.hasUnionPlan == other.hasUnionPlan;
+ eq &= this.terminalNode.equals(other.terminalNode);
+ eq &= this.inputContext.equals(other.inputContext);
+ if (!eq) {
+ return false;
+ }
+
+ ExecutionPlanComparator comparator = new ExecutionPlanComparator(this, other);
+ eq &= comparator.compare();
+ return eq;
+ }
+ return false;
+ }
+
+ public LogicalNode getRootChild(int pid) {
+ for (Integer childId : graph.getChilds(terminalNode.getPID())) {
+ if (childId == pid) {
+ return vertices.get(childId);
+ }
+ }
+ return null;
+ }
+
+ public int getChildCount(LogicalNode node) {
+ return graph.getChildCount(node.getPID());
+ }
+
+ public LogicalNode getChild(LogicalNode node, int i) {
+ return vertices.get(graph.getChild(node.getPID(), i));
+ }
+
+ public int getParentCount(LogicalNode node) {
+ return graph.getParentCount(node.getPID());
+ }
+
+ public LogicalNode getParent(LogicalNode node, int i) {
+ return vertices.get(graph.getParent(node.getPID(), i));
+ }
+
+ public List<LogicalNode> getChilds(LogicalNode node) {
+ List<LogicalNode> childs = new ArrayList<LogicalNode>();
+ for (Integer childId : graph.getChilds(node.getPID())) {
+ childs.add(vertices.get(childId));
+ }
+ return childs;
+ }
+
+ public void remove(LogicalNode child, LogicalNode parent) {
+ this.graph.removeEdge(child.getPID(), parent.getPID());
+ }
+
+ private static class LogicalNodeIdAndTag {
+ @Expose int id;
+ @Expose Tag tag;
+
+ public LogicalNodeIdAndTag(int id, Tag tag) {
+ this.id = id;
+ this.tag = tag;
+ }
+ }
+
+ public static class ExecutionPlanJsonHelper implements GsonObject {
+ @Expose private final boolean hasJoinPlan;
+ @Expose private final boolean hasUnionPlan;
+ @Expose private final InputContext inputContext;
+ @Expose private final LogicalRootNode terminalNode;
+ @Expose Map<Integer, LogicalNode> vertices = new HashMap<Integer, LogicalNode>();
+ @Expose Map<Integer, List<LogicalNodeIdAndTag>> adjacentList = new HashMap<Integer, List<LogicalNodeIdAndTag>>();
+
+ public ExecutionPlanJsonHelper(ExecutionPlan plan) {
+ this.hasJoinPlan = plan.hasJoinPlan;
+ this.hasUnionPlan = plan.hasUnionPlan;
+ this.inputContext = plan.getInputContext();
+ this.terminalNode = plan.terminalNode;
+ this.vertices.putAll(plan.vertices);
+ Collection<ExecutionPlanEdge> edges = plan.graph.getEdgesAll();
+ int parentId, childId;
+ List<LogicalNodeIdAndTag> adjacents;
+
+ // convert the graph to an adjacent list
+ for (ExecutionPlanEdge edge : edges) {
+ childId = edge.getChildId();
+ parentId = edge.getParentId();
+
+ if (adjacentList.containsKey(childId)) {
+ adjacents = adjacentList.get(childId);
+ } else {
+ adjacents = new ArrayList<LogicalNodeIdAndTag>();
+ adjacentList.put(childId, adjacents);
+ }
+ adjacents.add(new LogicalNodeIdAndTag(parentId, edge.getTag()));
+ }
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, ExecutionPlanJsonHelper.class);
+ }
+
+ public ExecutionPlan toExecutionPlan() {
+ ExecutionPlan plan = new ExecutionPlan(this.terminalNode);
+ plan.hasJoinPlan = this.hasJoinPlan;
+ plan.hasUnionPlan = this.hasUnionPlan;
+ plan.setInputContext(this.inputContext);
+ plan.vertices.putAll(this.vertices);
+
+ for (Entry<Integer, List<LogicalNodeIdAndTag>> e : this.adjacentList.entrySet()) {
+ LogicalNode child = this.vertices.get(e.getKey());
+ for (LogicalNodeIdAndTag idAndTag : e.getValue()) {
+ plan.add(child, this.vertices.get(idAndTag.id), idAndTag.tag);
+ }
+ }
+
+ return plan;
+ }
+ }
+
+ private static class ExecutionPlanComparator {
+ ExecutionPlan plan1;
+ ExecutionPlan plan2;
+ boolean equal = true;
+
+ public ExecutionPlanComparator(ExecutionPlan plan1, ExecutionPlan plan2) {
+ this.plan1 = plan1;
+ this.plan2 = plan2;
+ }
+
+ public boolean compare() {
+ Stack<Integer> s1 = new Stack<Integer>();
+ Stack<Integer> s2 = new Stack<Integer>();
+ s1.push(plan1.terminalNode.getPID());
+ s2.push(plan2.terminalNode.getPID());
+ return recursiveCompare(s1, s2);
+ }
+
+ private boolean recursiveCompare(Stack<Integer> s1, Stack<Integer> s2) {
+ Integer l1 = s1.pop();
+ Integer l2 = s2.pop();
+
+ if (l1.equals(l2)) {
+ if (plan1.graph.getChildCount(l1) == plan2.graph.getChildCount(l2)) {
+ if (plan1.graph.getChildCount(l1) > 0
+ && plan2.graph.getChildCount(l2) > 0) {
+ for (Integer child : plan1.graph.getChilds(l1)) {
+ s1.push(child);
+ }
+ for (Integer child : plan2.graph.getChilds(l2)) {
+ s2.push(child);
+ }
+ } else {
+ equal &= true;
+ return recursiveCompare(s1, s2);
+ }
+ } else {
+ equal = false;
+ }
+ } else {
+ equal = false;
+ }
+ return equal;
+ }
+ }
+
+ private static class ExecutionPlanBuilder implements LogicalNodeVisitor {
+ private ExecutionPlan plan;
+
+ public ExecutionPlanBuilder(ExecutionPlan plan) {
+ this.plan = plan;
+ }
+
+ @Override
+ public void visit(LogicalNode current) {
+ try {
+ Preconditions.checkArgument(current instanceof UnaryNode, "The current node should be an unary node");
+ visit(current, Tag.SINGLE);
+ } catch (PlanningException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void visit(LogicalNode current, Tag tag) throws PlanningException {
+ if (current instanceof UnaryNode) {
+ visitUnary((UnaryNode) current, tag);
+ } else if (current instanceof BinaryNode) {
+ visitBinary((BinaryNode) current, tag);
+ } else if (current instanceof ScanNode) {
+ visitScan((ScanNode) current, tag);
+ } else if (current instanceof TableSubQueryNode) {
+ visitTableSubQuery((TableSubQueryNode) current, tag);
+ }
+ }
+
+ private void visitScan(ScanNode node, Tag tag) throws PlanningException {
+ if (plan.inputContext == null) {
+ plan.inputContext = new InputContext();
+ }
+ plan.inputContext.addScanNode(node);
+ }
+
+ private void visitUnary(UnaryNode node, Tag tag) throws PlanningException {
+ if (node.getChild() != null) {
+ LogicalNode child = PlannerUtil.clone(node.getChild());
+ plan.add(child, node, tag);
+ node.setChild(null);
+ visit(child, tag);
+ }
+ }
+
+ private void visitBinary(BinaryNode node, Tag tag) throws PlanningException {
+ Preconditions.checkArgument(tag == Tag.SINGLE);
+
+ LogicalNode child;
+ if (node.getType() == NodeType.JOIN) {
+ plan.hasJoinPlan = true;
+ } else if (node.getType() == NodeType.UNION) {
+ plan.hasUnionPlan = true;
+ }
+ if (node.getLeftChild() != null) {
+ child = PlannerUtil.clone(node.getLeftChild());
+ plan.add(child, node, Tag.LEFT);
+ node.setLeftChild(null);
+ visit(child, Tag.LEFT);
+ }
+ if (node.getRightChild() != null) {
+ child = PlannerUtil.clone(node.getRightChild());
+ plan.add(child, node, Tag.RIGHT);
+ node.setRightChild(null);
+ visit(child, Tag.RIGHT);
+ }
+ }
+
+ private void visitTableSubQuery(TableSubQueryNode node, Tag tag) throws PlanningException {
+ LogicalNode child = PlannerUtil.clone(node.getSubQuery());
+ plan.add(child, node, tag);
+ visit(child, tag);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
new file mode 100644
index 0000000..c8b1415
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionPlanEdge.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+
+public class ExecutionPlanEdge {
+ public static enum Tag {
+ LEFT,
+ RIGHT,
+ SINGLE
+ }
+
+ private final Integer parentId;
+ private final Integer childId;
+ private final Tag tag;
+
+ public ExecutionPlanEdge(LogicalNode child, LogicalNode parent, Tag tag) {
+ this.parentId = parent.getPID();
+ this.childId = child.getPID();
+ this.tag = tag;
+ }
+
+ public Integer getParentId() {
+ return parentId;
+ }
+
+ public Integer getChildId() {
+ return childId;
+ }
+
+ public Tag getTag() {
+ return tag;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index f7540e7..9190da7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -25,10 +25,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
import org.apache.tajo.engine.eval.EvalTreeUtil;
import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.storage.AbstractStorageManager;
@@ -86,19 +88,25 @@ public class GlobalPlanner {
if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
ExecutionBlock execBlock = masterPlan.newExecutionBlock();
execBlock.setPlan(leftSubQuery);
- DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+ int childPid = execBlock.getPlan().getChild(execBlock.getPlan().getTerminalNode(), 0).getPID();
+ DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, childPid, null, NONE_PARTITION, 1,
+ execBlock.getPlan().getOutSchema(0));
masterPlan.addConnect(dataChannel);
}
if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
ExecutionBlock execBlock = masterPlan.newExecutionBlock();
execBlock.setPlan(rightSubQuery);
- DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+ int childPid = execBlock.getPlan().getChild(execBlock.getPlan().getTerminalNode(), 0).getPID();
+ DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, childPid, null, NONE_PARTITION, 1,
+ execBlock.getPlan().getOutSchema(0));
masterPlan.addConnect(dataChannel);
}
}
} else {
- DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
- dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
+ int childPid = globalPlanContext.topMostLeftExecBlock.getPlan().getChild(
+ globalPlanContext.topMostLeftExecBlock.getPlan().getTerminalNode(), 0).getPID();
+ DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, childPid, null,
+ NONE_PARTITION, 1, globalPlanContext.topMostLeftExecBlock.getPlan().getOutSchema(0));
masterPlan.addConnect(dataChannel);
}
masterPlan.setTerminal(terminalBlock);
@@ -110,8 +118,7 @@ public class GlobalPlanner {
throws PlanningException {
ExecutionBlock currentBlock = null;
- ExecutionBlock childBlock;
- childBlock = lastChildBlock;
+ ExecutionBlock childBlock = lastChildBlock;
NodeType shuffleRequiredNodeType = lastDistNode.getType();
if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
@@ -130,20 +137,42 @@ public class GlobalPlanner {
public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
Preconditions.checkArgument(channel.getSchema() != null,
- "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
+ "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() + ") is not initialized");
TableMeta meta = new TableMeta(channel.getStoreType(), new Options());
TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/"));
return new ScanNode(plan.newPID(), desc);
}
+ public static ScanNode buildInputExecutor(LogicalPlan plan, String tableName, Schema schema, StoreType storeType) {
+ TableMeta meta = new TableMeta(storeType, new Options());
+ TableDesc desc = new TableDesc(tableName, schema, meta, new Path("/"));
+ return new ScanNode(plan.newPID(), desc);
+ }
+
+ private static String getIntermediateTableName(ExecutionBlock block) {
+ return block.getId().toString();
+ }
+
private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
ExecutionBlock parent, JoinNode join, boolean leftTable) {
- ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
+ ExecutionBlock childBlock;
+ int childPid, parentPid;
+ ExecutionPlan execPlan = parent.getPlan();
+ LogicalNode scanParent = execPlan.getParent(execPlan.getInputContext().getScanNodes()[0], 0);
+ if (leftTable) {
+ childBlock = leftBlock;
+ parentPid = execPlan.getChild(scanParent, Tag.LEFT).getPID();
+ } else {
+ childBlock = rightBlock;
+ parentPid = execPlan.getChild(scanParent, Tag.RIGHT).getPID();
+ }
+ childPid = childBlock.getPlan().getChild(childBlock.getPlan().getTerminalNode(), 0).getPID();
- DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
+ DataChannel channel = new DataChannel(childBlock, parent, childPid, parentPid, HASH_PARTITION, 32,
+ childBlock.getPlan().getOutSchema(0));
if (join.getJoinType() != JoinType.CROSS) {
Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
- leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
+ join.getLeftChild().getOutSchema(), join.getRightChild().getOutSchema());
if (leftTable) {
channel.setPartitionKey(joinColumns[0]);
} else {
@@ -194,7 +223,6 @@ public class GlobalPlanner {
}
// symmetric repartition join
-
ExecutionBlock leftBlock;
if (lastChildBlock == null) {
leftBlock = masterPlan.newExecutionBlock();
@@ -206,22 +234,21 @@ public class GlobalPlanner {
rightBlock.setPlan(rightNode);
currentBlock = masterPlan.newExecutionBlock();
-
- DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
- DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
-
- ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
- ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-
+ ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(leftBlock), leftBlock.getPlan().getOutSchema(0), StoreType.CSV);
+ ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(rightBlock), rightBlock.getPlan().getOutSchema(0), StoreType.CSV);
joinNode.setLeftChild(leftScan);
joinNode.setRightChild(rightScan);
currentBlock.setPlan(joinNode);
+ DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
+ DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
+
masterPlan.addConnect(leftChannel);
masterPlan.addConnect(rightChannel);
return new ExecutionBlock[] { currentBlock, childBlock };
-
}
private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
@@ -252,13 +279,9 @@ public class GlobalPlanner {
SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
- DataChannel channel;
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
- channel.setPartitionKey(groupByNode.getGroupingColumns());
- channel.setSchema(groupByNode.getInSchema());
-
GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), getIntermediateTableName(childBlock),
+ childBlock.getPlan().getOutSchema(0), StoreType.CSV);
secondGroupBy.setChild(scanNode);
LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
@@ -266,24 +289,37 @@ public class GlobalPlanner {
((UnaryNode)parent).setChild(secondGroupBy);
}
- masterPlan.addConnect(channel);
currentBlock.setPlan(currentNode);
-
+
+ int childPid = childBlock.getPlan().getChild(childBlock.getPlan().getTerminalNode(), 0).getPID();
+ int parentPid = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, childPid, parentPid, HASH_PARTITION, 32,
+ childBlock.getPlan().getOutSchema(0));
+ channel.setPartitionKey(groupByNode.getGroupingColumns());
+ masterPlan.addConnect(channel);
+
} else {
- GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+ GroupbyNode[] twoPhaseGroupby = PlannerUtil.transformGroupbyTo2Pv2(masterPlan.getLogicalPlan(), groupByNode);
+ GroupbyNode secondGroupBy = twoPhaseGroupby[0];
+ GroupbyNode firstGroupBy = twoPhaseGroupby[1];
firstGroupBy.setHavingCondition(null);
+ int childPid, parentPid;
if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
- UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+ UnionNode unionNode = PlannerUtil.findTopNode(firstGroupBy, NodeType.UNION);
ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
UnionsFinderContext finderContext = new UnionsFinderContext();
finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
currentBlock = masterPlan.newExecutionBlock();
- GroupbyNode secondGroupBy = groupByNode;
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+
for (UnionNode union : finderContext.unionList) {
TableSubQueryNode leftSubQuery = union.getLeftChild();
TableSubQueryNode rightSubQuery = union.getRightChild();
@@ -293,10 +329,14 @@ public class GlobalPlanner {
GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
g1.setChild(leftSubQuery);
execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(execBlock), execBlock.getPlan().getOutSchema(0), StoreType.CSV);
secondGroupBy.setChild(scanNode);
+ currentBlock.setPlan(currentNode);
+ childPid = execBlock.getPlan().getChild(execBlock.getPlan().getTerminalNode(), 0).getPID();
+ parentPid = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ dataChannel = new DataChannel(execBlock, currentBlock, childPid, parentPid, HASH_PARTITION, 32,
+ execBlock.getPlan().getOutSchema(0));
masterPlan.addConnect(dataChannel);
}
if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
@@ -304,48 +344,48 @@ public class GlobalPlanner {
GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
g1.setChild(rightSubQuery);
execBlock.setPlan(g1);
- dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(execBlock), execBlock.getPlan().getOutSchema(0), StoreType.CSV);
secondGroupBy.setChild(scanNode);
+ currentBlock.setPlan(currentNode);
+ childPid = execBlock.getPlan().getChild(execBlock.getPlan().getTerminalNode(), 0).getPID();
+ parentPid = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ dataChannel = new DataChannel(execBlock, currentBlock, childPid, parentPid, HASH_PARTITION, 32,
+ execBlock.getPlan().getOutSchema(0));
masterPlan.addConnect(dataChannel);
}
}
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
- currentBlock.setPlan(currentNode);
} else {
-
if (childBlock == null) { // first repartition node
childBlock = masterPlan.newExecutionBlock();
}
childBlock.setPlan(firstGroupBy);
+ LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+ if (parent instanceof UnaryNode && parent != secondGroupBy) {
+ ((UnaryNode)parent).setChild(secondGroupBy);
+ }
+
currentBlock = masterPlan.newExecutionBlock();
+ ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(childBlock), childBlock.getPlan().getOutSchema(0), StoreType.CSV);
+ secondGroupBy.setChild(scanNode);
+ currentBlock.setPlan(currentNode);
DataChannel channel;
+ childPid = childBlock.getPlan().getChild(childBlock.getPlan().getTerminalNode(), 0).getPID();
+ parentPid = currentBlock.getInputContext().getScanNodes()[0].getPID();
if (firstGroupBy.isEmptyGrouping()) {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+ channel = new DataChannel(childBlock, currentBlock, childPid, parentPid, HASH_PARTITION, 1,
+ childBlock.getPlan().getOutSchema(0));
channel.setPartitionKey(firstGroupBy.getGroupingColumns());
} else {
- channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+ channel = new DataChannel(childBlock, currentBlock, childPid, parentPid, HASH_PARTITION, 32,
+ childBlock.getPlan().getOutSchema(0));
channel.setPartitionKey(firstGroupBy.getGroupingColumns());
}
- channel.setSchema(firstGroupBy.getOutSchema());
-
- GroupbyNode secondGroupBy = groupByNode;
- ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
- secondGroupBy.setChild(scanNode);
-
- LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
- if (parent instanceof UnaryNode && parent != secondGroupBy) {
- ((UnaryNode)parent).setChild(secondGroupBy);
- }
masterPlan.addConnect(channel);
- currentBlock.setPlan(currentNode);
}
}
@@ -356,19 +396,19 @@ public class GlobalPlanner {
LogicalNode childNode, ExecutionBlock childBlock) {
ExecutionBlock currentBlock = null;
- SortNode firstSort = (SortNode) lastDistNode;
+ SortNode[] twoPhaseSorts = PlannerUtil.transformSortTo2p(masterPlan.getLogicalPlan(), (SortNode) lastDistNode);
+ SortNode secondSort = twoPhaseSorts[0];
+ SortNode firstSort = twoPhaseSorts[1];
+
if (childBlock == null) {
childBlock = masterPlan.newExecutionBlock();
}
childBlock.setPlan(firstSort);
currentBlock = masterPlan.newExecutionBlock();
- DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
- channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
- channel.setSchema(childNode.getOutSchema());
- SortNode secondSort = PlannerUtil.clone(lastDistNode);
- ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+ ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(),
+ getIntermediateTableName(childBlock), childBlock.getPlan().getOutSchema(0), StoreType.CSV);
secondSort.setChild(secondScan);
LimitNode limitAndSort;
@@ -377,7 +417,7 @@ public class GlobalPlanner {
limitAndSort = PlannerUtil.clone(limitOrNull);
limitAndSort.setChild(firstSort);
- if (childBlock.getPlan().getType() == NodeType.SORT) {
+ if (firstSort.getType() == NodeType.SORT) {
childBlock.setPlan(limitAndSort);
} else {
LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
@@ -393,9 +433,15 @@ public class GlobalPlanner {
if (parent instanceof UnaryNode && parent != secondSort) {
((UnaryNode)parent).setChild(secondSort);
}
+ currentBlock.setPlan(currentNode);
+
+ int childPid = childBlock.getPlan().getChild(childBlock.getPlan().getTerminalNode(), 0).getPID();
+ int parentPid = currentBlock.getInputContext().getScanNodes()[0].getPID();
+ DataChannel channel = new DataChannel(childBlock, currentBlock, childPid, parentPid, RANGE_PARTITION, 32,
+ childBlock.getPlan().getOutSchema(0));
+ channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
masterPlan.addConnect(channel);
- currentBlock.setPlan(currentNode);
return new ExecutionBlock[] { currentBlock, childBlock };
}
@@ -408,7 +454,8 @@ public class GlobalPlanner {
super.visitRoot(context, plan, node, stack);
if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
- context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
+ context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
+ context.topmost, context.topMostLeftExecBlock);
} else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/InputContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/InputContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/InputContext.java
new file mode 100644
index 0000000..f7ac87c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/InputContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InputContext implements GsonObject {
+ @Expose private Map<Integer, ScanNode> scanNodes = new HashMap<Integer, ScanNode>();
+
+ public InputContext() {
+
+ }
+
+ @Override
+ public String toJson() {
+ return CoreGsonHelper.toJson(this, InputContext.class);
+ }
+
+ public ScanNode[] getScanNodes() {
+ return scanNodes.values().toArray(new ScanNode[scanNodes.size()]);
+ }
+
+ public ScanNode getScanNode(int pid) {
+ return scanNodes.get(pid);
+ }
+
+ public void setScanNodes(List<ScanNode> scanNodes) {
+ this.scanNodes.clear();
+ for (ScanNode scan : scanNodes) {
+ addScanNode(scan);
+ }
+ }
+
+ public void addScanNode(ScanNode scanNode) {
+ this.scanNodes.put(scanNode.getPID(), scanNode);
+ }
+
+ public int size() {
+ return scanNodes.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof InputContext) {
+ InputContext other = (InputContext) o;
+ return TUtil.checkEquals(this.getScanNodes(), other.getScanNodes());
+ }
+ return false;
+ }
+
+ public boolean contains(ScanNode scanNode) {
+ return this.scanNodes.containsKey(scanNode.getPID());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6aaeb1d..2529d64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -21,10 +21,12 @@
*/
package org.apache.tajo.engine.planner.global;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.engine.planner.LogicalPlan;
import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import java.util.ArrayList;
@@ -90,6 +92,14 @@ public class MasterPlan {
}
public ExecutionBlock newExecutionBlock() {
+ ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId(),
+ (LogicalRootNode) plan.getRootBlock().getRoot());
+ execBlockMap.put(newExecBlock.getId(), newExecBlock);
+ return newExecBlock;
+ }
+
+ @VisibleForTesting
+ public ExecutionBlock newExecutionBlockForTest() {
ExecutionBlock newExecBlock = new ExecutionBlock(newExecutionBlockId());
execBlockMap.put(newExecBlock.getId(), newExecBlock);
return newExecBlock;
@@ -107,12 +117,14 @@ public class MasterPlan {
execBlockGraph.addEdge(dataChannel.getSrcId(), dataChannel.getTargetId(), dataChannel);
}
+ @VisibleForTesting
public void addConnect(ExecutionBlock src, ExecutionBlock target, PartitionType type) {
addConnect(src.getId(), target.getId(), type);
}
+ @VisibleForTesting
public void addConnect(ExecutionBlockId src, ExecutionBlockId target, PartitionType type) {
- addConnect(new DataChannel(src, target, type));
+ addConnect(new DataChannel(src, target, null, null, type));
}
public boolean isConnected(ExecutionBlock src, ExecutionBlock target) {
@@ -145,7 +157,8 @@ public class MasterPlan {
public boolean isRoot(ExecutionBlock execBlock) {
if (!execBlock.getId().equals(terminalBlock.getId())) {
- return execBlockGraph.getParent(execBlock.getId()).equals(terminalBlock.getId());
+ return execBlockGraph.getParentCount(execBlock.getId()) == 1 &&
+ execBlockGraph.getParent(execBlock.getId(), 0).equals(terminalBlock.getId());
} else {
return false;
}
@@ -172,7 +185,7 @@ public class MasterPlan {
}
public ExecutionBlock getParent(ExecutionBlock executionBlock) {
- return execBlockMap.get(execBlockGraph.getParent(executionBlock.getId()));
+ return execBlockMap.get(execBlockGraph.getParent(executionBlock.getId(), 0));
}
public List<ExecutionBlock> getChilds(ExecutionBlock execBlock) {
@@ -201,7 +214,7 @@ public class MasterPlan {
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder("\n");
ExecutionBlockCursor cursor = new ExecutionBlockCursor(this);
sb.append("-------------------------------------------------------------------------------\n");
sb.append("Execution Block Graph (TERMINAL - " + getTerminalBlock() + ")\n");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
index b6ff71b..23c25e9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/DirectedGraph.java
@@ -45,7 +45,11 @@ public interface DirectedGraph<V, E> extends Graph<V, E> {
boolean isLeaf(V v);
- @Nullable V getParent(V v);
+ int getParentCount(V v);
+
+ @Nullable V getParent(V v, int idx);
+
+ List<V> getParents(V v);
int getChildCount(V v);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
index 12f33cb..8d0eecf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleDirectedGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.graph;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.gson.annotations.Expose;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.util.TUtil;
@@ -33,9 +34,9 @@ import java.util.*;
*/
public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
/** map: child -> parent */
- protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
+ @Expose protected Map<V, Map<V, E>> directedEdges = TUtil.newLinkedHashMap();
/** map: parent -> child */
- protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
+ @Expose protected Map<V, Map<V, E>> reversedEdges = TUtil.newLinkedHashMap();
@Override
public int getVertexSize() {
@@ -157,22 +158,38 @@ public class SimpleDirectedGraph<V, E> implements DirectedGraph<V,E> {
}
@Override
- public @Nullable V getParent(V v) {
+ public boolean isRoot(V v) {
+ return !directedEdges.containsKey(v);
+ }
+
+ @Override
+ public boolean isLeaf(V v) {
+ return !reversedEdges.containsKey(v);
+ }
+
+ @Override
+ public int getParentCount(V v) {
if (directedEdges.containsKey(v)) {
- return directedEdges.get(v).keySet().iterator().next();
+ return directedEdges.get(v).size();
} else {
- return null;
+ return -1;
}
}
@Override
- public boolean isRoot(V v) {
- return !directedEdges.containsKey(v);
+ public V getParent(V v, int idx) {
+ return getParents(v).get(idx);
}
@Override
- public boolean isLeaf(V v) {
- return !reversedEdges.containsKey(v);
+ public List<V> getParents(V v) {
+ List<V> parentBlocks = new ArrayList<V>();
+ if (directedEdges.containsKey(v)) {
+ for (Map.Entry<V, E> entry: directedEdges.get(v).entrySet()) {
+ parentBlocks.add(entry.getKey());
+ }
+ }
+ return parentBlocks;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleUndirectedGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleUndirectedGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleUndirectedGraph.java
index 38a025d..c373cfc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleUndirectedGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/graph/SimpleUndirectedGraph.java
@@ -76,11 +76,21 @@ public class SimpleUndirectedGraph<V, E> extends SimpleDirectedGraph<V, E> imple
}
@Override
- public V getParent(V v) {
+ public V getParent(V v, int idx) {
throw new UnsupportedOperationException("Cannot support getParent(V v) in UndirectedGraph");
}
@Override
+ public int getParentCount(V v) {
+ throw new UnsupportedOperationException("Cannot support getParentCount(V v) in UndirectedGraph");
+ }
+
+ @Override
+ public List<V> getParents(V v) {
+ throw new UnsupportedOperationException("Cannot support getParents(V v) in UndirectedGraph");
+ }
+
+ @Override
public boolean isRoot(V v) {
throw new UnsupportedOperationException("Cannot support isRoot(V v) in UndirectedGraph");
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
index 0520214..fbf0598 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ExceptNode.java
@@ -41,6 +41,23 @@ public class ExceptNode extends BinaryNode {
}
public String toString() {
- return getLeftChild().toString() + "\n EXCEPT \n" + getRightChild().toString();
+ StringBuffer sb = new StringBuffer();
+ if (leftChild != null) {
+ sb.append(getLeftChild().toString());
+ } else {
+ sb.append("null left child");
+ }
+ sb.append("\n EXCEPT \n");
+ if (rightChild != null) {
+ sb.append(getRightChild().toString());
+ } else {
+ sb.append("null right child");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean deepEquals(Object o) {
+ return this.equals(o) && super.deepEquals(o);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 352780a..0f41142 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -105,7 +105,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
if(i < columns.length - 1)
sb.append(",");
}
-
+
if(hasHavingCondition()) {
sb.append("], \"having qual\": \"").append(havingCondition).append("\"");
}
@@ -122,8 +122,12 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
sb.append("\n \"in schema\": ").append(getInSchema());
sb.append("}");
-
- return sb.toString() + "\n" + getChild().toString();
+
+ if (child != null) {
+ return sb.toString() + "\n" + getChild().toString();
+ } else {
+ return sb.toString();
+ }
}
@Override
@@ -139,7 +143,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
return false;
}
}
-
+
@Override
public Object clone() throws CloneNotSupportedException {
GroupbyNode grp = (GroupbyNode) super.clone();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
index 493698d..4128407 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/IntersectNode.java
@@ -40,6 +40,18 @@ public class IntersectNode extends BinaryNode {
}
public String toString() {
- return getLeftChild().toString() + "\n INTERSECT \n" + getRightChild().toString();
+ StringBuffer sb = new StringBuffer();
+ if (leftChild != null) {
+ sb.append(getLeftChild().toString());
+ } else {
+ sb.append("null left child");
+ }
+ sb.append("\n INTERSECT \n");
+ if (rightChild != null) {
+ sb.append(getRightChild().toString());
+ } else {
+ sb.append("null right child");
+ }
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
index 23a79df..78d9d42 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/JoinNode.java
@@ -149,7 +149,13 @@ public class JoinNode extends BinaryNode implements Projectable, Cloneable {
sb.append("\n\"out schema: ").append(getOutSchema());
sb.append("\n\"in schema: ").append(getInSchema());
- sb.append("\n" + getLeftChild().toString()).append(" and ").append(getRightChild());
+
+ if (leftChild != null) {
+ sb.append("\n"+ getLeftChild().toString());
+ }
+ if (rightChild != null) {
+ sb.append("\n" + getRightChild().toString());
+ }
return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
index a246c0e..b0fb802 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -61,7 +61,9 @@ public final class LimitNode extends UnaryNode implements Cloneable {
sb.append("\n \"out schema: ").append(getOutSchema())
.append("\n \"in schema: " + getInSchema());
- sb.append("\n").append(getChild().toString());
+ if (child != null) {
+ sb.append("\n").append(getChild().toString());
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
index 8b1a82a..aa18bfc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalNode.java
@@ -1,6 +1,6 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
+ * or more contributor license aggreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
@@ -21,7 +21,10 @@
*/
package org.apache.tajo.engine.planner.logical;
+import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlanString;
@@ -41,6 +44,10 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
this.type = type;
}
+ public void setPid(int pid) {
+ this.pid = pid;
+ }
+
public int getPID() {
return pid;
}
@@ -76,6 +83,11 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
public Schema getOutSchema() {
return this.outputSchema;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(pid, type, inputSchema, outputSchema, cost);
+ }
@Override
public boolean equals(Object obj) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
index 72f5ee8..850fc43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LogicalRootNode.java
@@ -26,7 +26,11 @@ public class LogicalRootNode extends UnaryNode implements Cloneable {
}
public String toString() {
- return "Logical Plan Root\n\n" + getChild().toString();
+ if (child != null) {
+ return "Logical Plan Root\n\n" + getChild().toString();
+ } else {
+ return "Logical Plan Root\n\n";
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 44790ec..c6ab7ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -46,7 +46,6 @@ public enum NodeType {
UNION(UnionNode.class),
TABLE_SUBQUERY(TableSubQueryNode.class);
-
private final Class<? extends LogicalNode> baseClass;
NodeType(Class<? extends LogicalNode> baseClass) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
index 8ef2bda..a797485 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ProjectionNode.java
@@ -76,8 +76,12 @@ public class ProjectionNode extends UnaryNode implements Projectable {
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
sb.append("\n \"in schema\": ").append(getInSchema());
sb.append("}");
- return sb.toString()+"\n"
- + getChild().toString();
+ if (child != null) {
+ return sb.toString()+"\n"
+ + getChild().toString();
+ } else {
+ return sb.toString();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 04c7b5a..c929f57 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -145,8 +145,8 @@ public class ScanNode extends RelationNode implements Projectable {
}
return false;
- }
-
+ }
+
@Override
public Object clone() throws CloneNotSupportedException {
ScanNode scanNode = (ScanNode) super.clone();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index 699a561..33d3cab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -70,7 +70,11 @@ public class SelectionNode extends UnaryNode implements Cloneable {
sb.append("\n \"out schema\": ").append(getOutSchema()).append(",");
sb.append("\n \"in schema\": ").append(getInSchema()).append("}");
- return sb.toString()+"\n"
- + getChild().toString();
+ if (child != null) {
+ return sb.toString()+"\n"
+ + getChild().toString();
+ } else {
+ return sb.toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
index 46ceea8..ad1315f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -93,7 +93,11 @@ public final class SortNode extends UnaryNode implements Cloneable {
sb.append("\n\"out schema: " + getOutSchema()
+ "\n\"in schema: " + getInSchema());
- return sb.toString()+"\n"
+ if (child != null) {
+ return sb.toString()+"\n"
+ getChild().toString();
+ } else {
+ return sb.toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 94447c0..7eeb3b7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -172,7 +172,11 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
.append("\n \"in schema\": ").append(getInSchema())
.append("}");
- return sb.toString() + "\n"
+ if (child != null) {
+ return sb.toString() + "\n"
+ getChild().toString();
+ } else {
+ return sb.toString();
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
index d0e8b02..6a88d3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -42,6 +42,23 @@ public class UnionNode extends BinaryNode {
}
public String toString() {
- return getLeftChild().toString() + "\n UNION \n" + getRightChild().toString();
+ StringBuffer sb = new StringBuffer();
+ if (leftChild != null) {
+ sb.append(getLeftChild().toString());
+ } else {
+ sb.append("null left child");
+ }
+ sb.append("\n UNION \n");
+ if (rightChild != null) {
+ sb.append(getRightChild().toString());
+ } else {
+ sb.append("null right child");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public boolean deepEquals(Object o) {
+ return this.equals(o) && super.deepEquals(o);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
index 596c470..bee0d47 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/IndexedStoreExec.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -44,6 +44,8 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
private FileAppender appender;
private TableMeta meta;
+ private Tuple prevKeyTuple;
+
public IndexedStoreExec(final TaskAttemptContext context, final AbstractStorageManager sm,
final PhysicalExec child, final Schema inSchema, final Schema outSchema,
final SortSpec[] sortSpecs) throws IOException {
@@ -79,17 +81,17 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
this.indexWriter.setLoadNum(100);
this.indexWriter.open();
+ this.prevKeyTuple = null;
}
@Override
public Tuple next() throws IOException {
Tuple tuple;
Tuple keyTuple;
- Tuple prevKeyTuple = null;
long offset;
-
- while((tuple = child.next()) != null) {
+ tuple = child.next();
+ if (tuple != null) {
offset = appender.getOffset();
appender.addTuple(tuple);
keyTuple = new VTuple(keySchema.getColumnNum());
@@ -100,7 +102,7 @@ public class IndexedStoreExec extends UnaryPhysicalExec {
}
}
- return null;
+ return tuple;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MultiOutputExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MultiOutputExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MultiOutputExec.java
new file mode 100644
index 0000000..1c97d1c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MultiOutputExec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class MultiOutputExec extends UnaryPhysicalExec {
+ private Tuple buffer;
+ private int remainBufferCount;
+ private final int outputNumber;
+
+ public MultiOutputExec(TaskAttemptContext context, Schema outSchema, PhysicalExec child,
+ int outputNumber) {
+ super(context, outSchema, outSchema, child);
+ this.outputNumber = outputNumber;
+ }
+
+ @Override
+ public void init() throws IOException {
+ remainBufferCount = outputNumber;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (remainBufferCount == 0) {
+ buffer = child.next();
+ remainBufferCount = outputNumber;
+ }
+ remainBufferCount--;
+ return buffer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
index 2e53229..1de113f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PartitionedStoreExec.java
@@ -118,12 +118,26 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
Tuple tuple;
Appender appender;
int partition;
- while ((tuple = child.next()) != null) {
+
+ tuple = child.next();
+ if (tuple != null) {
partition = partitioner.getPartition(tuple);
appender = getAppender(partition);
appender.addTuple(tuple);
}
-
+
+ return tuple;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ // nothing to do
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
List<TableStats> statSet = new ArrayList<TableStats>();
for (Map.Entry<Integer, Appender> entry : appenderMap.entrySet()) {
int partNum = entry.getKey();
@@ -135,16 +149,9 @@ public final class PartitionedStoreExec extends UnaryPhysicalExec {
context.addRepartition(partNum, getDataFile(partNum).getName());
}
}
-
+
// Collect and aggregated statistics data
TableStats aggregated = StatisticsUtil.aggregateTableStat(statSet);
context.setResultStats(aggregated);
-
- return null;
- }
-
- @Override
- public void rescan() throws IOException {
- // nothing to do
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalRootExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalRootExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalRootExec.java
new file mode 100644
index 0000000..44c22fb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalRootExec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+public class PhysicalRootExec extends PhysicalExec {
+ private final List<PhysicalExec> children;
+ private final static Schema nullSchema = new Schema();
+ private final BitSet trueSet;
+
+ public PhysicalRootExec(TaskAttemptContext context, List<PhysicalExec> children) {
+ super(context, nullSchema, nullSchema);
+ this.children = children;
+ trueSet = new BitSet(children.size());
+ trueSet.clear();
+ trueSet.flip(0, children.size());
+ }
+
+ @Override
+ public void init() throws IOException {
+ for (PhysicalExec child : children) {
+ child.init();
+ }
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ BitSet endFlags = new BitSet(children.size());
+ endFlags.clear();
+
+ do {
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).next() == null) {
+ endFlags.set(i);
+ }
+ }
+ } while (!endFlags.equals(trueSet));
+ return null;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ for (PhysicalExec child : children) {
+ child.rescan();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (PhysicalExec child : children) {
+ child.close();
+ }
+ }
+
+ public PhysicalExec getChild(int i) {
+ return children.get(i);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 673e0b5..706cdb3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -81,11 +81,11 @@ public class StoreTableExec extends UnaryPhysicalExec {
*/
@Override
public Tuple next() throws IOException {
- while((tuple = child.next()) != null) {
+ tuple = child.next();
+ if (tuple != null) {
appender.addTuple(tuple);
}
-
- return null;
+ return tuple;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
index 383a787..641b54e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
@@ -45,6 +45,7 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
public boolean shouldDie();
public void setShouldDie();
public QueryContext getQueryContext();
- public DataChannel getDataChannel();
+ public List<DataChannel> getIncomingChannels();
+ public List<DataChannel> getOutgoingChannels();
public Enforcer getEnforcer();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 3c3c3dd..f261b08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -21,10 +21,10 @@ package org.apache.tajo.engine.query;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol.DataChannelProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.storage.fragment.Fragment;
import java.net.URI;
import java.util.ArrayList;
@@ -44,10 +44,11 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
private List<Fetch> fetches;
private Boolean shouldDie;
private QueryContext queryContext;
- private DataChannel dataChannel;
+ private List<DataChannel> incomingChanels;
+ private List<DataChannel> outgoingChannels;
private Enforcer enforcer;
- private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
+ private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
private QueryUnitRequestProto.Builder builder = null;
private boolean viaProto = false;
@@ -59,9 +60,11 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments,
String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
+ String serializedData, QueryContext queryContext, List<DataChannel> incomingChanels,
+ List<DataChannel> outgoingChannels, Enforcer enforcer) {
this();
- this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
+ this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, incomingChanels,
+ outgoingChannels, enforcer);
}
public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
@@ -73,7 +76,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
public void set(QueryUnitAttemptId id, List<FragmentProto> fragments,
String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
+ String serializedData, QueryContext queryContext, List<DataChannel> incomingChannels,
+ List<DataChannel> outgoingChannels, Enforcer enforcer) {
this.id = id;
this.fragments = fragments;
this.outputTable = outputTable;
@@ -82,7 +86,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.isUpdated = true;
this.queryContext = queryContext;
this.queryContext = queryContext;
- this.dataChannel = dataChannel;
+ this.incomingChanels = incomingChannels;
+ this.outgoingChannels = outgoingChannels;
this.enforcer = enforcer;
}
@@ -206,22 +211,28 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.queryContext = queryContext;
}
- public void setDataChannel(DataChannel dataChannel) {
+ public void addIncomingChanel(DataChannel incomingChanel) {
maybeInitBuilder();
- this.dataChannel = dataChannel;
+ initIncomingChannels();
+ this.incomingChanels.add(incomingChanel);
+ }
+
+ public void addOutgoingChannel(DataChannel outgoingChannel) {
+ maybeInitBuilder();
+ initOutgoingChannels();
+ this.outgoingChannels.add(outgoingChannel);
}
@Override
- public DataChannel getDataChannel() {
- QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
- if (dataChannel != null) {
- return dataChannel;
- }
- if (!p.hasDataChannel()) {
- return null;
- }
- this.dataChannel = new DataChannel(p.getDataChannel());
- return this.dataChannel;
+ public List<DataChannel> getIncomingChannels() {
+ initIncomingChannels();
+ return this.incomingChanels;
+ }
+
+ @Override
+ public List<DataChannel> getOutgoingChannels() {
+ initOutgoingChannels();
+ return this.outgoingChannels;
}
@Override
@@ -242,6 +253,28 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
return this.fetches;
}
+
+ private void initIncomingChannels() {
+ if (this.incomingChanels != null) {
+ return;
+ }
+ QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+ this.incomingChanels = new ArrayList<DataChannel>();
+ for (DataChannelProto channelProto : p.getIncomingChannelsList()) {
+ this.incomingChanels.add(new DataChannel(channelProto));
+ }
+ }
+
+ private void initOutgoingChannels() {
+ if (this.outgoingChannels != null) {
+ return;
+ }
+ QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+ this.outgoingChannels = new ArrayList<DataChannel>();
+ for (DataChannelProto channelProto : p.getOutgoingChannelsList()) {
+ this.outgoingChannels.add(new DataChannel(channelProto));
+ }
+ }
private void initFetches() {
if (this.fetches != null) {
@@ -310,8 +343,15 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
if (this.queryContext != null) {
builder.setQueryContext(queryContext.getProto());
}
- if (this.dataChannel != null) {
- builder.setDataChannel(dataChannel.getProto());
+ if (this.incomingChanels != null) {
+ for (DataChannel inChannel : this.incomingChanels) {
+ builder.addIncomingChannels(inChannel.getProto());
+ }
+ }
+ if (this.outgoingChannels != null) {
+ for (DataChannel outChannel : this.outgoingChannels) {
+ builder.addOutgoingChannels(outChannel.getProto());
+ }
}
if (this.enforcer != null) {
builder.setEnforcer(enforcer.getProto());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d432017..a214adb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -467,9 +467,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
new ArrayList<FragmentProto>(task.getAllFragments()),
"",
false,
- task.getLogicalPlan().toJson(),
+ task.getPlan().toJson(),
context.getQueryContext(),
- subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
+ subQuery.getIncomingChannels(),
+ subQuery.getOutgoingChannels(),
+ subQuery.getBlock().getEnforcer());
if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
taskAssign.setInterQuery();
}
@@ -515,9 +517,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
Lists.newArrayList(task.getAllFragments()),
"",
false,
- task.getLogicalPlan().toJson(),
+ task.getPlan().toJson(),
context.getQueryContext(),
- subQuery.getDataChannel(),
+ subQuery.getIncomingChannels(),
+ subQuery.getOutgoingChannels(),
subQuery.getBlock().getEnforcer());
if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
taskAssign.setInterQuery();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 5b23fb2..baace79 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -356,7 +356,7 @@ public class QueryMasterTask extends CompositeService {
// Create a subdirectories
defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
- LOG.info("The staging dir '" + outputDir + "' is created.");
+ LOG.info("The staging dir '" + stagingDir + "' is created.");
queryContext.setStagingDir(stagingDir);
/////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 1d40616..16b9816 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -28,9 +28,9 @@ import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.ipc.TajoWorkerProtocol.Partition;
import org.apache.tajo.master.TaskState;
import org.apache.tajo.master.event.*;
@@ -55,10 +55,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private final Configuration systemConf;
private QueryUnitId taskId;
private EventHandler eventHandler;
- private StoreTableNode store = null;
- private LogicalNode plan = null;
- private List<ScanNode> scan;
-
+ private ExecutionPlan plan = null;
+
private Map<String, FragmentProto> fragMap;
private Map<String, Set<URI>> fetchMap;
@@ -118,7 +116,6 @@ public class QueryUnit implements EventHandler<TaskEvent> {
this.taskId = id;
this.eventHandler = eventHandler;
this.isLeafTask = isLeafTask;
- scan = new ArrayList<ScanNode>();
fetchMap = Maps.newHashMap();
fragMap = Maps.newHashMap();
partitions = new ArrayList<Partition>();
@@ -160,26 +157,9 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
- public void setLogicalPlan(LogicalNode plan) {
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scan.add((ScanNode)node);
- }
- }
- }
+ public void setExecutionPlan(ExecutionPlan plan) {
+ this.plan = plan;
+ }
@Deprecated
public void setFragment(String tableId, FileFragment fragment) {
@@ -231,9 +211,9 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return fragMap.values();
}
- public LogicalNode getLogicalPlan() {
- return this.plan;
- }
+ public ExecutionPlan getPlan() {
+ return this.plan;
+ }
public QueryUnitId getId() {
return taskId;
@@ -251,25 +231,17 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.fetchMap.get(scan.getTableName());
}
- public String getOutputName() {
- return this.store.getTableName();
- }
-
- public Schema getOutputSchema() {
- return this.store.getOutSchema();
- }
-
- public StoreTableNode getStoreTableNode() {
- return this.store;
- }
-
+ public Collection<URI> getFetch(String sourceName) {
+ return this.fetchMap.get(sourceName);
+ }
+
public ScanNode[] getScanNodes() {
- return this.scan.toArray(new ScanNode[scan.size()]);
+ return this.plan.getInputContext().getScanNodes();
}
@Override
public String toString() {
- String str = new String(plan.getType() + " \n");
+ String str = new String(plan.getTerminalNode().getChild().getType() + " \n");
for (Entry<String, FragmentProto> e : fragMap.entrySet()) {
str += e.getKey() + " : ";
str += e.getValue() + " ";
[4/4] git commit: TAJO-266: Extend ExecutionBlock and Task to support
multiple outputs. (jihoon)
Posted by ji...@apache.org.
TAJO-266: Extend ExecutionBlock and Task to support multiple outputs. (jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/7c97735e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/7c97735e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/7c97735e
Branch: refs/heads/DAG-execplan
Commit: 7c97735e1008e61f00ddbd41ba924a00d5b8baf3
Parents: 67e0d94
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Nov 26 18:13:11 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Nov 26 18:13:11 2013 +0900
----------------------------------------------------------------------
.../java/org/apache/tajo/catalog/Schema.java | 5 +
.../java/org/apache/tajo/catalog/TableDesc.java | 5 +
.../apache/tajo/engine/json/CoreGsonHelper.java | 4 +
.../tajo/engine/json/ExecutionPlanAdapter.java | 49 ++
.../tajo/engine/json/SourceContextAdapter.java | 47 ++
.../apache/tajo/engine/planner/LogicalPlan.java | 5 +-
.../tajo/engine/planner/PhysicalPlanner.java | 6 +-
.../engine/planner/PhysicalPlannerImpl.java | 463 +++++++++++--------
.../apache/tajo/engine/planner/PlannerUtil.java | 227 +++++++++
.../tajo/engine/planner/global/DataChannel.java | 50 +-
.../planner/global/DestinationContext.java | 104 +++++
.../engine/planner/global/ExecutionBlock.java | 73 +--
.../engine/planner/global/ExecutionPlan.java | 381 +++++++++++++++
.../planner/global/ExecutionPlanEdge.java | 51 ++
.../engine/planner/global/GlobalPlanner.java | 175 ++++---
.../engine/planner/global/InputContext.java | 78 ++++
.../tajo/engine/planner/global/MasterPlan.java | 21 +-
.../engine/planner/graph/DirectedGraph.java | 6 +-
.../planner/graph/SimpleDirectedGraph.java | 35 +-
.../planner/graph/SimpleUndirectedGraph.java | 12 +-
.../tajo/engine/planner/logical/ExceptNode.java | 19 +-
.../engine/planner/logical/GroupbyNode.java | 12 +-
.../engine/planner/logical/IntersectNode.java | 14 +-
.../tajo/engine/planner/logical/JoinNode.java | 8 +-
.../tajo/engine/planner/logical/LimitNode.java | 4 +-
.../engine/planner/logical/LogicalNode.java | 14 +-
.../engine/planner/logical/LogicalRootNode.java | 6 +-
.../tajo/engine/planner/logical/NodeType.java | 1 -
.../engine/planner/logical/ProjectionNode.java | 8 +-
.../tajo/engine/planner/logical/ScanNode.java | 4 +-
.../engine/planner/logical/SelectionNode.java | 8 +-
.../tajo/engine/planner/logical/SortNode.java | 6 +-
.../engine/planner/logical/StoreTableNode.java | 6 +-
.../tajo/engine/planner/logical/UnionNode.java | 19 +-
.../planner/physical/IndexedStoreExec.java | 12 +-
.../planner/physical/MultiOutputExec.java | 52 +++
.../planner/physical/PartitionedStoreExec.java | 27 +-
.../planner/physical/PhysicalRootExec.java | 81 ++++
.../engine/planner/physical/StoreTableExec.java | 6 +-
.../tajo/engine/query/QueryUnitRequest.java | 3 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 82 +++-
.../tajo/master/DefaultTaskScheduler.java | 11 +-
.../master/querymaster/QueryMasterTask.java | 2 +-
.../tajo/master/querymaster/QueryUnit.java | 60 +--
.../tajo/master/querymaster/Repartitioner.java | 52 ++-
.../tajo/master/querymaster/SubQuery.java | 30 +-
.../org/apache/tajo/worker/TajoQueryEngine.java | 7 +-
.../main/java/org/apache/tajo/worker/Task.java | 57 ++-
.../apache/tajo/worker/TaskAttemptContext.java | 19 +-
.../src/main/proto/TajoWorkerProtocol.proto | 8 +-
.../tajo/engine/planner/TestLogicalPlan.java | 4 +-
.../planner/global/TestExecutionPlan.java | 98 ++++
.../engine/planner/global/TestMasterPlan.java | 6 +-
.../planner/physical/TestBNLJoinExec.java | 24 +-
.../planner/physical/TestBSTIndexExec.java | 12 +-
.../planner/physical/TestExternalSortExec.java | 14 +-
.../physical/TestFullOuterHashJoinExec.java | 37 +-
.../physical/TestFullOuterMergeJoinExec.java | 56 ++-
.../planner/physical/TestHashAntiJoinExec.java | 12 +-
.../planner/physical/TestHashJoinExec.java | 10 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../physical/TestLeftOuterHashJoinExec.java | 77 +--
.../physical/TestLeftOuterNLJoinExec.java | 72 +--
.../planner/physical/TestMergeJoinExec.java | 14 +-
.../engine/planner/physical/TestNLJoinExec.java | 20 +-
.../planner/physical/TestPhysicalPlanner.java | 222 ++++++---
.../physical/TestRightOuterHashJoinExec.java | 51 +-
.../physical/TestRightOuterMergeJoinExec.java | 103 +++--
.../engine/planner/physical/TestSortExec.java | 12 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 24 +-
70 files changed, 2512 insertions(+), 799 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..404af75 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,6 +212,11 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
}
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
@Override
public boolean equals(Object o) {
if (o instanceof Schema) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..5573de6 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -158,4 +158,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
}
return builder.build();
}
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
index 4dfb314..194d2b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
@@ -30,6 +30,8 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.function.AggFunction;
import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.InputContext;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.json.*;
import org.apache.tajo.util.TUtil;
@@ -56,6 +58,8 @@ public class CoreGsonHelper {
adapters.put(AggFunction.class, new FunctionAdapter());
adapters.put(Datum.class, new DatumAdapter());
adapters.put(DataType.class, new DataTypeAdapter());
+ adapters.put(ExecutionPlan.class, new ExecutionPlanAdapter());
+ adapters.put(InputContext.class, new SourceContextAdapter());
return adapters;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
new file mode 100644
index 0000000..7ee8347
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.ExecutionPlanJsonHelper;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class ExecutionPlanAdapter implements GsonSerDerAdapter<ExecutionPlan> {
+
+ @Override
+ public ExecutionPlan deserialize(JsonElement jsonElement, Type type,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject json = jsonElement.getAsJsonObject();
+ String typeName = json.get("type").getAsJsonPrimitive().getAsString();
+ Preconditions.checkState(typeName.equals("ExecutionPlan"));
+ ExecutionPlanJsonHelper helper = context.deserialize(json.get("body"), ExecutionPlanJsonHelper.class);
+ return helper.toExecutionPlan();
+ }
+
+ @Override
+ public JsonElement serialize(ExecutionPlan src, Type type, JsonSerializationContext context) {
+ ExecutionPlanJsonHelper helper = new ExecutionPlanJsonHelper(src);
+ JsonObject json = new JsonObject();
+ json.addProperty("type", "ExecutionPlan");
+ json.add("body", context.serialize(helper, ExecutionPlanJsonHelper.class));
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
new file mode 100644
index 0000000..d92d504
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.InputContext;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class SourceContextAdapter implements GsonSerDerAdapter<InputContext> {
+
+ @Override
+ public InputContext deserialize(JsonElement jsonElement, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ JsonObject json = jsonElement.getAsJsonObject();
+ ScanNode[] scanNodes = context.deserialize(json.get("body"), ScanNode[].class);
+ InputContext srcContext = new InputContext();
+ for (ScanNode scan : scanNodes) {
+ srcContext.addScanNode(scan);
+ }
+ return srcContext;
+ }
+
+ @Override
+ public JsonElement serialize(InputContext src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("body", context.serialize(src.getScanNodes(), ScanNode[].class));
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 4a305ae..3295efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.tajo.algebra.*;
@@ -122,11 +123,13 @@ public class LogicalPlan {
}
public void connectBlocks(QueryBlock srcBlock, QueryBlock targetBlock, BlockType type) {
+ Preconditions.checkState(queryBlockGraph.getParentCount(srcBlock.getName()) <= 0,
+ "There should be only one parent block.");
queryBlockGraph.addEdge(srcBlock.getName(), targetBlock.getName(), new BlockEdge(srcBlock, targetBlock, type));
}
public QueryBlock getParentBlock(QueryBlock block) {
- return queryBlocks.get(queryBlockGraph.getParent(block.getName()));
+ return queryBlocks.get(queryBlockGraph.getParent(block.getName(), 0));
}
public List<QueryBlock> getChildBlocks(QueryBlock block) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
index ebe47b4..90cfc9c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -21,16 +21,16 @@
*/
package org.apache.tajo.engine.planner;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.worker.TaskAttemptContext;
/**
* This class generates a physical execution plan.
*/
public interface PhysicalPlanner {
public PhysicalExec createPlan(TaskAttemptContext context,
- LogicalNode logicalPlan)
+ ExecutionPlan execPlan)
throws InternalException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index db58e32..87daa8f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,44 +16,45 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.planner;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
private static final int UNGENERATED_PID = -1;
@@ -66,54 +67,74 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
this.sm = sm;
}
- public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
+ public PhysicalExec createPlan(final TaskAttemptContext context, ExecutionPlan plan)
throws InternalException {
PhysicalExec execPlan;
try {
- execPlan = createPlanRecursive(context, logicalPlan);
- if (execPlan instanceof StoreTableExec
- || execPlan instanceof IndexedStoreExec
- || execPlan instanceof PartitionedStoreExec) {
- return execPlan;
- } else if (context.getDataChannel() != null) {
- return buildOutputOperator(context, logicalPlan, execPlan);
- } else {
- return execPlan;
- }
+ plan = checkOutputOperator(context, plan);
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
} catch (IOException ioe) {
throw new InternalException(ioe);
}
}
- private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
- PhysicalExec execPlan) throws IOException {
- DataChannel channel = context.getDataChannel();
- StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
- storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- storeTableNode.setInSchema(plan.getOutSchema());
- storeTableNode.setOutSchema(plan.getOutSchema());
- if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
- storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
- } else {
- storeTableNode.setDefaultParition();
+ @VisibleForTesting
+ public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
+ throws InternalException {
+ PhysicalExec execPlan;
+
+ try {
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
+ } catch (IOException ioe) {
+ throw new InternalException(ioe);
}
- storeTableNode.setChild(plan);
+ }
- PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
- return outExecPlan;
+ private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
+ LogicalNode root = plan.getTerminalNode();
+ List<DataChannel> channels = context.getOutgoingChannels();
+ for (DataChannel channel : channels) {
+ LogicalNode node = plan.getRootChild(channel.getSrcPID());
+ if (node.getType() != NodeType.STORE) {
+ StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
+ storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+ storeTableNode.setInSchema(channel.getSchema());
+ storeTableNode.setOutSchema(channel.getSchema());
+ if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+ storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
+ } else {
+ storeTableNode.setDefaultParition();
+ }
+
+ plan.remove(node, root);
+ plan.add(node, storeTableNode, Tag.SINGLE);
+ plan.add(storeTableNode, root, Tag.SINGLE);
+ channel.updateSrcPID(storeTableNode.getPID());
+ }
+ }
+ return plan;
}
- private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+ private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
+ PhysicalExec currentExec;
switch (logicalNode.getType()) {
case ROOT:
LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
- return createPlanRecursive(ctx, rootNode.getChild());
+ List<PhysicalExec> childExecs = new ArrayList<PhysicalExec>();
+ for (LogicalNode child : plan.getChilds(rootNode)) {
+ childExecs.add(createPlanRecursive(ctx, plan, child));
+ }
+ return new PhysicalRootExec(ctx, childExecs);
case EXPRS:
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,60 +142,109 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
- leftExec = createPlanRecursive(ctx, storeNode.getChild());
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(storeNode).get(0));
return createStorePlan(ctx, storeNode, leftExec);
case SELECTION:
SelectionNode selNode = (SelectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, selNode.getChild());
- return new SelectionExec(ctx, selNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(selNode).get(0));
+ currentExec = new SelectionExec(ctx, selNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case PROJECTION:
ProjectionNode prjNode = (ProjectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, prjNode.getChild());
- return new ProjectionExec(ctx, prjNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(prjNode).get(0));
+ currentExec = new ProjectionExec(ctx, prjNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
- leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
- return leftExec;
+ leftExec = createPlanRecursive(ctx, plan, subQueryNode.getSubQuery());
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
} case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
- return leftExec;
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
- leftExec = createPlanRecursive(ctx, grpNode.getChild());
- return createGroupByPlan(ctx, grpNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(grpNode).get(0));
+ currentExec = createGroupByPlan(ctx, plan, grpNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case SORT:
SortNode sortNode = (SortNode) logicalNode;
- leftExec = createPlanRecursive(ctx, sortNode.getChild());
- return createSortPlan(ctx, sortNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(sortNode).get(0));
+ currentExec = createSortPlan(ctx, plan, sortNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case JOIN:
JoinNode joinNode = (JoinNode) logicalNode;
- leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
- return createJoinPlan(ctx, joinNode, leftExec, rightExec);
+ List<LogicalNode> childs = plan.getChilds(joinNode);
+ leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+ rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+ currentExec = createJoinPlan(ctx, plan, joinNode, leftExec, rightExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case UNION:
UnionNode unionNode = (UnionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
- return new UnionExec(ctx, leftExec, rightExec);
+ childs = plan.getChilds(unionNode);
+ leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+ rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+ currentExec = new UnionExec(ctx, leftExec, rightExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case LIMIT:
LimitNode limitNode = (LimitNode) logicalNode;
- leftExec = createPlanRecursive(ctx, limitNode.getChild());
- return new LimitExec(ctx, limitNode.getInSchema(),
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(limitNode).get(0));
+ currentExec = new LimitExec(ctx, limitNode.getInSchema(),
limitNode.getOutSchema(), leftExec, limitNode);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case BST_INDEX_SCAN:
IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
- leftExec = createIndexScanExec(ctx, indexScanNode);
- return leftExec;
+ currentExec = createIndexScanExec(ctx, indexScanNode);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
default:
return null;
@@ -194,107 +264,108 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return size;
}
- public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
- PhysicalExec rightExec) throws IOException {
+ public PhysicalExec createJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode joinNode,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
switch (joinNode.getJoinType()) {
case CROSS:
- return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
+ return createCrossJoinPlan(context, plan, joinNode, leftExec, rightExec);
case INNER:
- return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
+ return createInnerJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_OUTER:
- return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_OUTER:
- return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case FULL_OUTER:
- return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createFullOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_SEMI:
- return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_SEMI:
- return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_ANTI:
- return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_ANTI:
- return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
default:
throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
}
}
- private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + join.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, join, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + join.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, join, leftExec, rightExec);
default:
// fallback algorithm
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ return new BNLJoinExec(context, join, leftExec, rightExec);
}
} else {
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ return new BNLJoinExec(context, join, leftExec, rightExec);
}
}
- private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, node);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, node, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, node, leftExec, rightExec);
case IN_MEMORY_HASH_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
- return new HashJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [In-memory Hash Join]");
+ return new HashJoinExec(context, node, leftExec, rightExec);
case MERGE_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Sort Merge Join]");
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
case HYBRID_HASH_JOIN:
default:
LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
}
} else {
- return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+ return createBestInnerJoinPlan(context, plan, node, leftExec, rightExec);
}
}
- private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ List<LogicalNode> childs = plan.getChilds(node);
+ String [] leftLineage = PlannerUtil.getLineage(plan, childs.get(0));
+ String [] rightLineage = PlannerUtil.getLineage(plan, childs.get(1));
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
@@ -318,10 +389,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedOuter = leftExec;
}
- LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
- return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+ LOG.info("Join (" + node.getPID() +") chooses [InMemory Hash Join]");
+ return new HashJoinExec(context, node, selectedOuter, selectedInner);
} else {
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
}
}
@@ -340,58 +411,58 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
}
- private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
case NESTED_LOOP_JOIN:
//the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+ return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
- return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+ return createBestLeftOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
else {
//the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+ return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
}
}
- private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
- String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] outerLineage4 = PlannerUtil.getLineage(plan.getChild(join, 0));
long outerSize = estimateSizeRecursive(context, outerLineage4);
if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
- LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
} else {
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
@@ -408,56 +479,56 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
}
- private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
case MERGE_JOIN:
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
- return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+ return createBestRightJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
case MERGE_JOIN:
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
- return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+ return createBestFullOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
@@ -472,8 +543,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedLeft = rightExec;
selectedRight = leftExec;
}
- LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
- return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
+ LOG.info("Full Outer Join (" + join.getPID() +") chooses [Hash Join]");
+ return new HashFullOuterJoinExec(context, join, selectedRight, selectedLeft);
}
private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -491,117 +562,117 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
- private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
final long threshold = 1048576 * 128;
if (outerSize2 < threshold || innerSize2 < threshold) {
- return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
} else {
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
*/
- private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
@@ -609,7 +680,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
StoreTableNode plan, PhysicalExec subOp) throws IOException {
if (plan.getPartitionType() == PartitionType.HASH_PARTITION
|| plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
- switch (ctx.getDataChannel().getPartitionType()) {
+ DataChannel channel = null;
+ for (DataChannel outChannel : ctx.getOutgoingChannels()) {
+ if (outChannel.getSrcPID() == plan.getPID()) {
+ channel = outChannel;
+ }
+ }
+ switch (channel.getPartitionType()) {
case HASH_PARTITION:
return new PartitionedStoreExec(ctx, sm, plan, subOp);
@@ -620,10 +697,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
if (sortExec != null) {
sortSpecs = sortExec.getSortSpecs();
} else {
- Column[] columns = ctx.getDataChannel().getPartitionKey();
- SortSpec specs[] = new SortSpec[columns.length];
+ Column[] columns = channel.getPartitionKey();
+ sortSpecs= new SortSpec[columns.length];
for (int i = 0; i < columns.length; i++) {
- specs[i] = new SortSpec(columns[i]);
+ sortSpecs[i] = new SortSpec(columns[i]);
}
}
@@ -642,11 +719,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
- FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+ FragmentProto[] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
- public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+ public PhysicalExec createGroupByPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
+ PhysicalExec subOp)
throws IOException {
Enforcer enforcer = context.getEnforcer();
@@ -659,7 +737,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return createSortAggregation(context, property, groupbyNode, subOp);
}
}
- return createBestAggregationPlan(context, groupbyNode, subOp);
+ return createBestAggregationPlan(context, plan, groupbyNode, subOp);
}
private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -678,7 +756,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
if (property != null) {
- List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+ List<SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
int i = 0;
@@ -692,20 +770,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
SortNode sortNode = new SortNode(-1, sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
- // SortExec sortExec = new SortExec(sortNode, child);
ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
- private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+ private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
}
- String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+ String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(groupbyNode).get(0));
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
@@ -719,7 +796,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+ public PhysicalExec createSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
@@ -732,16 +809,16 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- return createBestSortPlan(context, sortNode, child);
+ return createBestSortPlan(context, plan, sortNode, child);
}
- public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+ public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
- String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+ String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(sortNode).get(0));
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = 1048576 * 2000;
- // if the relation size is less than the reshold,
+ // if the relation size is less than thereshold,
// the in-memory sort will be used.
if (estimatedSize <= threshold) {
return new MemSortExec(context, sortNode, child);
@@ -757,17 +834,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
- FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+ FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
List<FileFragment> fragments =
FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
- String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
+ String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
+ annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
annotation.getSortKeys());
- return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
- annotation.getKeySchema(), comp, annotation.getDatum());
+ return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(
+ indexPath, indexName), annotation.getKeySchema(), comp,
+ annotation.getDatum());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 9371463..3b8bd08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,6 +29,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.storage.TupleComparator;
@@ -61,6 +62,17 @@ public class PlannerUtil {
}
return tableNames;
}
+
+ public static String [] getLineage(ExecutionPlan plan, LogicalNode node) {
+ LogicalNode [] scans = PlannerUtil.findAllNodes(plan, node, NodeType.SCAN);
+ String [] tableNames = new String[scans.length];
+ ScanNode scan;
+ for (int i = 0; i < scans.length; i++) {
+ scan = (ScanNode) scans[i];
+ tableNames[i] = scan.getCanonicalName();
+ }
+ return tableNames;
+ }
/**
* Delete the logical node from a plan.
@@ -151,6 +163,97 @@ public class PlannerUtil {
parentNode.setChild(newNode);
}
+ public static GroupbyNode[] transformGroupbyTo2Pv2(LogicalPlan plan, GroupbyNode groupBy) {
+ Preconditions.checkNotNull(groupBy);
+
+ GroupbyNode parent = null, child = null;
+
+ // cloning groupby node
+ try {
+ parent = groupBy;
+ child = (GroupbyNode) groupBy.clone();
+ child.setPid(plan.newPID());
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
+
+ List<Target> firstStepTargets = Lists.newArrayList();
+ Target[] secondTargets = parent.getTargets();
+ Target[] firstTargets = child.getTargets();
+
+ Target second;
+ Target first;
+ int targetId = 0;
+ for (int i = 0; i < firstTargets.length; i++) {
+ second = secondTargets[i];
+ first = firstTargets[i];
+
+ List<AggregationFunctionCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
+ List<AggregationFunctionCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+
+ if (firstStepFunctions.size() == 0) {
+ firstStepTargets.add(first);
+ targetId++;
+ } else {
+ for (AggregationFunctionCallEval func : firstStepFunctions) {
+ Target newTarget;
+
+ if (func.isDistinct()) {
+ List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
+ newTarget = new Target(new FieldEval(fields.get(0)));
+ String targetName = "column_" + (targetId++);
+ newTarget.setAlias(targetName);
+
+ AggregationFunctionCallEval secondFunc = null;
+ for (AggregationFunctionCallEval sf : secondStepFunctions) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column(targetName, newTarget.getEvalTree().getValueType()))});
+ } else {
+ func.setFirstPhase();
+ newTarget = new Target(func);
+ String targetName = "column_" + (targetId++);
+ newTarget.setAlias(targetName);
+
+ AggregationFunctionCallEval secondFunc = null;
+ for (AggregationFunctionCallEval sf : secondStepFunctions) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column(targetName, newTarget.getEvalTree().getValueType()))});
+ }
+ firstStepTargets.add(newTarget);
+ }
+ }
+
+ // Getting new target list and updating input/output schema from the new target list.
+ Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
+ Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
+ List<Target> newTarget = Lists.newArrayList();
+ for (Column column : parent.getGroupingColumns()) {
+ if (!targetSchema.contains(column.getQualifiedName())) {
+ newTarget.add(new Target(new FieldEval(column)));
+ }
+ }
+ targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
+
+ child.setTargets(targetArray);
+ child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
+ // set the groupby chaining
+ groupBy.setInSchema(child.getOutSchema());
+
+ }
+ return new GroupbyNode[] {parent, child};
+ }
+
public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
Preconditions.checkNotNull(groupBy);
@@ -241,6 +344,21 @@ public class PlannerUtil {
return child;
}
+ public static SortNode[] transformSortTo2p(LogicalPlan plan, SortNode sort) {
+ Preconditions.checkArgument(sort != null);
+ SortNode parent = null, child = null;
+ try {
+ parent = sort;
+ child = (SortNode) sort.clone();
+ child.setPid(plan.newPID());
+ } catch (CloneNotSupportedException e) {
+ LOG.warn(e);
+ }
+
+ parent.setInSchema(child.getOutSchema());
+
+ return new SortNode[]{parent, child};
+ }
/**
* Find the top logical node matched to type from the given node
@@ -262,6 +380,88 @@ public class PlannerUtil {
return (T) finder.getFoundNodes().get(0);
}
+ private static class LogicalNodeFinderForExecPlan {
+ private NodeType type;
+ private ExecutionPlan plan;
+ private LogicalNode node;
+ private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+ public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan, LogicalNode node) {
+ this.type = type;
+ this.plan = plan;
+ this.node = node;
+ }
+
+ public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+ this(type, plan, plan.getTerminalNode());
+ }
+
+ public void find() {
+ this.visit(node);
+ }
+
+ private void visit(LogicalNode node) {
+ if (plan.getChildCount(node) > 0) {
+ for (LogicalNode child : plan.getChilds(node)) {
+ this.visit(child);
+ }
+ }
+
+ if (node.getType() == type) {
+ foundNodes.add(node);
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return foundNodes;
+ }
+ }
+
+ private static class ParentNodeFinderForExecPlan {
+ private NodeType type;
+ private ExecutionPlan plan;
+ private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+ public ParentNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+ this.type = type;
+ this.plan = plan;
+ }
+
+ public void find() {
+ this.visit(plan.getTerminalNode());
+ }
+
+ private void visit(LogicalNode node) {
+ if (plan.getChildCount(node) > 0) {
+ for (LogicalNode child : plan.getChilds(node)) {
+ this.visit(child);
+ }
+ for (LogicalNode child : plan.getChilds(node)) {
+ if (child.getType() == type) {
+ foundNodes.add(child);
+ }
+ }
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return foundNodes;
+ }
+ }
+
+ public static <T extends LogicalNode> T findTopNode(ExecutionPlan executionPlan, NodeType type) {
+ Preconditions.checkNotNull(executionPlan);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return (T) finder.getFoundNodes().get(0);
+ }
+
/**
* Find the all logical node matched to type from the given node
*
@@ -282,6 +482,20 @@ public class PlannerUtil {
List<LogicalNode> founds = finder.getFoundNodes();
return founds.toArray(new LogicalNode[founds.size()]);
}
+
+ public static LogicalNode [] findAllNodes(ExecutionPlan plan, LogicalNode node, NodeType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, plan, node);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return new LogicalNode[] {};
+ }
+ List<LogicalNode> founds = finder.getFoundNodes();
+ return founds.toArray(new LogicalNode[founds.size()]);
+ }
/**
* Find a parent node of a given-typed operator.
@@ -303,6 +517,19 @@ public class PlannerUtil {
return (T) finder.getFoundNodes().get(0);
}
+ public static <T extends LogicalNode> T findTopParentNode(ExecutionPlan node, NodeType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ ParentNodeFinderForExecPlan finder = new ParentNodeFinderForExecPlan(type, node);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return (T) finder.getFoundNodes().get(0);
+ }
+
public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 0401718..26cedd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -38,23 +38,31 @@ public class DataChannel {
private StoreType storeType = StoreType.CSV;
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+ private Integer srcPID;
+ private Integer targetPID;
+
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
this.srcId = srcId;
this.targetId = targetId;
+ this.srcPID = srcPID;
+ this.targetPID = targetPID;
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
- this(srcId, targetId);
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+ PartitionType partitionType) {
+ this(srcId, targetId, srcPID, targetPID);
this.partitionType = partitionType;
}
- public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
- this(src.getId(), target.getId(), partitionType, partNum);
- setSchema(src.getPlan().getOutSchema());
+ public DataChannel(ExecutionBlock src, ExecutionBlock target, Integer srcPID, Integer targetPID,
+ PartitionType partitionType, int partNum, Schema schema) {
+ this(src.getId(), target.getId(), srcPID, targetPID, partitionType, partNum);
+ setSchema(schema);
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
- this(srcId, targetId, partitionType);
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+ PartitionType partitionType, int partNum) {
+ this(srcId, targetId, srcPID, targetPID, partitionType);
this.partitionNum = partNum;
}
@@ -77,6 +85,12 @@ public class DataChannel {
if (proto.hasPartitionNum()) {
this.partitionNum = proto.getPartitionNum();
}
+ if (proto.hasSrcPID()) {
+ this.srcPID = proto.getSrcPID();
+ }
+ if (proto.hasTargetPID()) {
+ this.targetPID = proto.getTargetPID();
+ }
}
public ExecutionBlockId getSrcId() {
@@ -163,6 +177,12 @@ public class DataChannel {
if (partitionNum != null) {
builder.setPartitionNum(partitionNum);
}
+ if (srcPID != null) {
+ builder.setSrcPID(srcPID);
+ }
+ if (targetPID != null) {
+ builder.setTargetPID(targetPID);
+ }
return builder.build();
}
@@ -177,7 +197,7 @@ public class DataChannel {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[").append(srcId.getQueryId()).append("] ");
- sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+ sb.append(srcId.getId()).append("."+srcPID).append(" => ").append(targetId.getId()).append("."+targetPID);
sb.append(" (type=").append(partitionType);
if (hasPartitionKey()) {
sb.append(", key=");
@@ -195,4 +215,16 @@ public class DataChannel {
sb.append(")");
return sb.toString();
}
+
+ public void updateSrcPID(int srcPID) {
+ this.srcPID = srcPID;
+ }
+
+ public Integer getSrcPID() {
+ return srcPID;
+ }
+
+ public Integer getTargetPID() {
+ return targetPID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
new file mode 100644
index 0000000..3c2152c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+
+public class DestinationContext {
+ @Expose private String destTableName;
+ @Expose private StoreType storeType = StoreType.CSV;
+ @Expose private NodeType terminalNodeType;
+ @Expose private Schema outputSchema;
+ @Expose private Column[] groupingColumns;
+ @Expose private SortSpec[] sortSpecs;
+
+ public DestinationContext() {
+
+ }
+
+ public DestinationContext(LogicalNode node) {
+ this.set(node);
+ }
+
+ public void set(LogicalNode node) {
+ terminalNodeType = node.getType();
+ outputSchema = node.getOutSchema();
+ if (terminalNodeType.equals(NodeType.GROUP_BY)) {
+ groupingColumns = ((GroupbyNode)node).getGroupingColumns();
+ } else if (terminalNodeType.equals(NodeType.SORT)) {
+ sortSpecs = ((SortNode)node).getSortKeys();
+ }
+ }
+
+ public NodeType getTerminalNodeType() {
+ return terminalNodeType;
+ }
+
+ public void setTerminalNodeType(NodeType terminalNodeType) {
+ this.terminalNodeType = terminalNodeType;
+ }
+
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+
+ public void setOutputSchema(Schema outputSchema) {
+ this.outputSchema = outputSchema;
+ }
+
+ public Column [] getGroupingColumns() {
+ return groupingColumns;
+ }
+
+ public void setGroupingColumns(Column [] groupingColumns) {
+ this.groupingColumns = groupingColumns;
+ }
+
+ public SortSpec[] getSortSpecs() {
+ return sortSpecs;
+ }
+
+ public void setSortSpecs(SortSpec[] sortSpecs) {
+ this.sortSpecs = sortSpecs;
+ }
+
+ public StoreType getStoreType() {
+ return storeType;
+ }
+
+ public void setStoreType(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ public String getDestTableName() {
+ return destTableName;
+ }
+
+ public void setDestTableName(String destTableName) {
+ this.destTableName = destTableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index efa1c7f..7cfa478 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -14,12 +14,15 @@
package org.apache.tajo.engine.planner.global;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@ -30,16 +33,17 @@ import java.util.*;
*/
public class ExecutionBlock {
private ExecutionBlockId executionBlockId;
- private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+ private ExecutionPlan executionPlan;
private Enforcer enforcer = new Enforcer();
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
-
private Set<String> broadcasted = new HashSet<String>();
+ public ExecutionBlock(ExecutionBlockId executionBlockId, LogicalRootNode rootNode) {
+ this.executionBlockId = executionBlockId;
+ this.executionPlan = new ExecutionPlan(rootNode);
+ }
+
+ @VisibleForTesting
public ExecutionBlock(ExecutionBlockId executionBlockId) {
this.executionBlockId = executionBlockId;
}
@@ -49,64 +53,27 @@ public class ExecutionBlock {
}
public void setPlan(LogicalNode plan) {
- hasJoinPlan = false;
- hasUnionPlan = false;
- this.scanlist.clear();
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- if (binary.getType() == NodeType.JOIN) {
- hasJoinPlan = true;
- } else if (binary.getType() == NodeType.UNION) {
- hasUnionPlan = true;
- }
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scanlist.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- TableSubQueryNode subQuery = (TableSubQueryNode) node;
- s.add(s.size(), subQuery.getSubQuery());
- }
- }
+ executionPlan.setPlan(plan);
}
-
- public LogicalNode getPlan() {
- return plan;
+ public ExecutionPlan getPlan() {
+ return executionPlan;
}
public Enforcer getEnforcer() {
return enforcer;
}
- public StoreTableNode getStoreTableNode() {
- return store;
- }
-
- public ScanNode [] getScanNodes() {
- return this.scanlist.toArray(new ScanNode[scanlist.size()]);
- }
-
- public Schema getOutputSchema() {
- return store.getOutSchema();
+ public InputContext getInputContext() {
+ return executionPlan.getInputContext();
}
public boolean hasJoin() {
- return hasJoinPlan;
+ return executionPlan.hasJoinPlan();
}
public boolean hasUnion() {
- return hasUnionPlan;
+ return executionPlan.hasUnionPlan();
}
public void addBroadcastTables(Collection<String> tableNames) {