You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/26 10:14:36 UTC
[4/4] git commit: TAJO-266: Extend ExecutionBlock and Task to support
multiple outputs. (jihoon)
TAJO-266: Extend ExecutionBlock and Task to support multiple outputs. (jihoon)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/7c97735e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/7c97735e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/7c97735e
Branch: refs/heads/DAG-execplan
Commit: 7c97735e1008e61f00ddbd41ba924a00d5b8baf3
Parents: 67e0d94
Author: Jihoon Son <ji...@apache.org>
Authored: Tue Nov 26 18:13:11 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Nov 26 18:13:11 2013 +0900
----------------------------------------------------------------------
.../java/org/apache/tajo/catalog/Schema.java | 5 +
.../java/org/apache/tajo/catalog/TableDesc.java | 5 +
.../apache/tajo/engine/json/CoreGsonHelper.java | 4 +
.../tajo/engine/json/ExecutionPlanAdapter.java | 49 ++
.../tajo/engine/json/SourceContextAdapter.java | 47 ++
.../apache/tajo/engine/planner/LogicalPlan.java | 5 +-
.../tajo/engine/planner/PhysicalPlanner.java | 6 +-
.../engine/planner/PhysicalPlannerImpl.java | 463 +++++++++++--------
.../apache/tajo/engine/planner/PlannerUtil.java | 227 +++++++++
.../tajo/engine/planner/global/DataChannel.java | 50 +-
.../planner/global/DestinationContext.java | 104 +++++
.../engine/planner/global/ExecutionBlock.java | 73 +--
.../engine/planner/global/ExecutionPlan.java | 381 +++++++++++++++
.../planner/global/ExecutionPlanEdge.java | 51 ++
.../engine/planner/global/GlobalPlanner.java | 175 ++++---
.../engine/planner/global/InputContext.java | 78 ++++
.../tajo/engine/planner/global/MasterPlan.java | 21 +-
.../engine/planner/graph/DirectedGraph.java | 6 +-
.../planner/graph/SimpleDirectedGraph.java | 35 +-
.../planner/graph/SimpleUndirectedGraph.java | 12 +-
.../tajo/engine/planner/logical/ExceptNode.java | 19 +-
.../engine/planner/logical/GroupbyNode.java | 12 +-
.../engine/planner/logical/IntersectNode.java | 14 +-
.../tajo/engine/planner/logical/JoinNode.java | 8 +-
.../tajo/engine/planner/logical/LimitNode.java | 4 +-
.../engine/planner/logical/LogicalNode.java | 14 +-
.../engine/planner/logical/LogicalRootNode.java | 6 +-
.../tajo/engine/planner/logical/NodeType.java | 1 -
.../engine/planner/logical/ProjectionNode.java | 8 +-
.../tajo/engine/planner/logical/ScanNode.java | 4 +-
.../engine/planner/logical/SelectionNode.java | 8 +-
.../tajo/engine/planner/logical/SortNode.java | 6 +-
.../engine/planner/logical/StoreTableNode.java | 6 +-
.../tajo/engine/planner/logical/UnionNode.java | 19 +-
.../planner/physical/IndexedStoreExec.java | 12 +-
.../planner/physical/MultiOutputExec.java | 52 +++
.../planner/physical/PartitionedStoreExec.java | 27 +-
.../planner/physical/PhysicalRootExec.java | 81 ++++
.../engine/planner/physical/StoreTableExec.java | 6 +-
.../tajo/engine/query/QueryUnitRequest.java | 3 +-
.../tajo/engine/query/QueryUnitRequestImpl.java | 82 +++-
.../tajo/master/DefaultTaskScheduler.java | 11 +-
.../master/querymaster/QueryMasterTask.java | 2 +-
.../tajo/master/querymaster/QueryUnit.java | 60 +--
.../tajo/master/querymaster/Repartitioner.java | 52 ++-
.../tajo/master/querymaster/SubQuery.java | 30 +-
.../org/apache/tajo/worker/TajoQueryEngine.java | 7 +-
.../main/java/org/apache/tajo/worker/Task.java | 57 ++-
.../apache/tajo/worker/TaskAttemptContext.java | 19 +-
.../src/main/proto/TajoWorkerProtocol.proto | 8 +-
.../tajo/engine/planner/TestLogicalPlan.java | 4 +-
.../planner/global/TestExecutionPlan.java | 98 ++++
.../engine/planner/global/TestMasterPlan.java | 6 +-
.../planner/physical/TestBNLJoinExec.java | 24 +-
.../planner/physical/TestBSTIndexExec.java | 12 +-
.../planner/physical/TestExternalSortExec.java | 14 +-
.../physical/TestFullOuterHashJoinExec.java | 37 +-
.../physical/TestFullOuterMergeJoinExec.java | 56 ++-
.../planner/physical/TestHashAntiJoinExec.java | 12 +-
.../planner/physical/TestHashJoinExec.java | 10 +-
.../planner/physical/TestHashSemiJoinExec.java | 8 +-
.../physical/TestLeftOuterHashJoinExec.java | 77 +--
.../physical/TestLeftOuterNLJoinExec.java | 72 +--
.../planner/physical/TestMergeJoinExec.java | 14 +-
.../engine/planner/physical/TestNLJoinExec.java | 20 +-
.../planner/physical/TestPhysicalPlanner.java | 222 ++++++---
.../physical/TestRightOuterHashJoinExec.java | 51 +-
.../physical/TestRightOuterMergeJoinExec.java | 103 +++--
.../engine/planner/physical/TestSortExec.java | 12 +-
.../tajo/worker/TestRangeRetrieverHandler.java | 24 +-
70 files changed, 2512 insertions(+), 799 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..404af75 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,6 +212,11 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
}
}
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
@Override
public boolean equals(Object o) {
if (o instanceof Schema) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..5573de6 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -158,4 +158,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
}
return builder.build();
}
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
index 4dfb314..194d2b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java
@@ -30,6 +30,8 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.eval.EvalNode;
import org.apache.tajo.engine.function.AggFunction;
import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.InputContext;
import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.json.*;
import org.apache.tajo.util.TUtil;
@@ -56,6 +58,8 @@ public class CoreGsonHelper {
adapters.put(AggFunction.class, new FunctionAdapter());
adapters.put(Datum.class, new DatumAdapter());
adapters.put(DataType.class, new DataTypeAdapter());
+ adapters.put(ExecutionPlan.class, new ExecutionPlanAdapter());
+ adapters.put(InputContext.class, new SourceContextAdapter());
return adapters;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
new file mode 100644
index 0000000..7ee8347
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/ExecutionPlanAdapter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlan.ExecutionPlanJsonHelper;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class ExecutionPlanAdapter implements GsonSerDerAdapter<ExecutionPlan> {
+
+ @Override
+ public ExecutionPlan deserialize(JsonElement jsonElement, Type type,
+ JsonDeserializationContext context) throws JsonParseException {
+ JsonObject json = jsonElement.getAsJsonObject();
+ String typeName = json.get("type").getAsJsonPrimitive().getAsString();
+ Preconditions.checkState(typeName.equals("ExecutionPlan"));
+ ExecutionPlanJsonHelper helper = context.deserialize(json.get("body"), ExecutionPlanJsonHelper.class);
+ return helper.toExecutionPlan();
+ }
+
+ @Override
+ public JsonElement serialize(ExecutionPlan src, Type type, JsonSerializationContext context) {
+ ExecutionPlanJsonHelper helper = new ExecutionPlanJsonHelper(src);
+ JsonObject json = new JsonObject();
+ json.addProperty("type", "ExecutionPlan");
+ json.add("body", context.serialize(helper, ExecutionPlanJsonHelper.class));
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
new file mode 100644
index 0000000..d92d504
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/SourceContextAdapter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.json;
+
+import com.google.gson.*;
+import org.apache.tajo.engine.planner.global.InputContext;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.json.GsonSerDerAdapter;
+
+import java.lang.reflect.Type;
+
+public class SourceContextAdapter implements GsonSerDerAdapter<InputContext> {
+
+ @Override
+ public InputContext deserialize(JsonElement jsonElement, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ JsonObject json = jsonElement.getAsJsonObject();
+ ScanNode[] scanNodes = context.deserialize(json.get("body"), ScanNode[].class);
+ InputContext srcContext = new InputContext();
+ for (ScanNode scan : scanNodes) {
+ srcContext.addScanNode(scan);
+ }
+ return srcContext;
+ }
+
+ @Override
+ public JsonElement serialize(InputContext src, Type typeOfSrc, JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("body", context.serialize(src.getScanNodes(), ScanNode[].class));
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 4a305ae..3295efd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.planner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.tajo.algebra.*;
@@ -122,11 +123,13 @@ public class LogicalPlan {
}
public void connectBlocks(QueryBlock srcBlock, QueryBlock targetBlock, BlockType type) {
+ Preconditions.checkState(queryBlockGraph.getParentCount(srcBlock.getName()) <= 0,
+ "There should be only one parent block.");
queryBlockGraph.addEdge(srcBlock.getName(), targetBlock.getName(), new BlockEdge(srcBlock, targetBlock, type));
}
public QueryBlock getParentBlock(QueryBlock block) {
- return queryBlocks.get(queryBlockGraph.getParent(block.getName()));
+ return queryBlocks.get(queryBlockGraph.getParent(block.getName(), 0));
}
public List<QueryBlock> getChildBlocks(QueryBlock block) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
index ebe47b4..90cfc9c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -21,16 +21,16 @@
*/
package org.apache.tajo.engine.planner;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.physical.PhysicalExec;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.worker.TaskAttemptContext;
/**
* This class generates a physical execution plan.
*/
public interface PhysicalPlanner {
public PhysicalExec createPlan(TaskAttemptContext context,
- LogicalNode logicalPlan)
+ ExecutionPlan execPlan)
throws InternalException;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index db58e32..87daa8f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,44 +16,45 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.planner;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ObjectArrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
+import org.apache.tajo.engine.planner.global.ExecutionPlanEdge.Tag;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.planner.physical.*;
import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
import org.apache.tajo.storage.AbstractStorageManager;
import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.IndexUtil;
import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
-import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
-
public class PhysicalPlannerImpl implements PhysicalPlanner {
private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
private static final int UNGENERATED_PID = -1;
@@ -66,54 +67,74 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
this.sm = sm;
}
- public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
+ public PhysicalExec createPlan(final TaskAttemptContext context, ExecutionPlan plan)
throws InternalException {
PhysicalExec execPlan;
try {
- execPlan = createPlanRecursive(context, logicalPlan);
- if (execPlan instanceof StoreTableExec
- || execPlan instanceof IndexedStoreExec
- || execPlan instanceof PartitionedStoreExec) {
- return execPlan;
- } else if (context.getDataChannel() != null) {
- return buildOutputOperator(context, logicalPlan, execPlan);
- } else {
- return execPlan;
- }
+ plan = checkOutputOperator(context, plan);
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
} catch (IOException ioe) {
throw new InternalException(ioe);
}
}
- private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
- PhysicalExec execPlan) throws IOException {
- DataChannel channel = context.getDataChannel();
- StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
- storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
- storeTableNode.setInSchema(plan.getOutSchema());
- storeTableNode.setOutSchema(plan.getOutSchema());
- if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
- storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
- } else {
- storeTableNode.setDefaultParition();
+ @VisibleForTesting
+ public PhysicalExec createPlanWithoutMaterialize(final TaskAttemptContext context, ExecutionPlan plan)
+ throws InternalException {
+ PhysicalExec execPlan;
+
+ try {
+ execPlan = createPlanRecursive(context, plan, plan.getTerminalNode());
+
+ return execPlan;
+ } catch (IOException ioe) {
+ throw new InternalException(ioe);
}
- storeTableNode.setChild(plan);
+ }
- PhysicalExec outExecPlan = createStorePlan(context, storeTableNode, execPlan);
- return outExecPlan;
+ private ExecutionPlan checkOutputOperator(TaskAttemptContext context, ExecutionPlan plan) {
+ LogicalNode root = plan.getTerminalNode();
+ List<DataChannel> channels = context.getOutgoingChannels();
+ for (DataChannel channel : channels) {
+ LogicalNode node = plan.getRootChild(channel.getSrcPID());
+ if (node.getType() != NodeType.STORE) {
+ StoreTableNode storeTableNode = new StoreTableNode(UNGENERATED_PID, channel.getTargetId().toString());
+ storeTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+ storeTableNode.setInSchema(channel.getSchema());
+ storeTableNode.setOutSchema(channel.getSchema());
+ if (channel.getPartitionType() != PartitionType.NONE_PARTITION) {
+ storeTableNode.setPartitions(channel.getPartitionType(), channel.getPartitionKey(), channel.getPartitionNum());
+ } else {
+ storeTableNode.setDefaultParition();
+ }
+
+ plan.remove(node, root);
+ plan.add(node, storeTableNode, Tag.SINGLE);
+ plan.add(storeTableNode, root, Tag.SINGLE);
+ channel.updateSrcPID(storeTableNode.getPID());
+ }
+ }
+ return plan;
}
- private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode) throws IOException {
+ private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, ExecutionPlan plan, LogicalNode logicalNode) throws IOException {
PhysicalExec leftExec;
PhysicalExec rightExec;
+ PhysicalExec currentExec;
switch (logicalNode.getType()) {
case ROOT:
LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
- return createPlanRecursive(ctx, rootNode.getChild());
+ List<PhysicalExec> childExecs = new ArrayList<PhysicalExec>();
+ for (LogicalNode child : plan.getChilds(rootNode)) {
+ childExecs.add(createPlanRecursive(ctx, plan, child));
+ }
+ return new PhysicalRootExec(ctx, childExecs);
case EXPRS:
EvalExprNode evalExpr = (EvalExprNode) logicalNode;
@@ -121,60 +142,109 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
case STORE:
StoreTableNode storeNode = (StoreTableNode) logicalNode;
- leftExec = createPlanRecursive(ctx, storeNode.getChild());
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(storeNode).get(0));
return createStorePlan(ctx, storeNode, leftExec);
case SELECTION:
SelectionNode selNode = (SelectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, selNode.getChild());
- return new SelectionExec(ctx, selNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(selNode).get(0));
+ currentExec = new SelectionExec(ctx, selNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case PROJECTION:
ProjectionNode prjNode = (ProjectionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, prjNode.getChild());
- return new ProjectionExec(ctx, prjNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(prjNode).get(0));
+ currentExec = new ProjectionExec(ctx, prjNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case TABLE_SUBQUERY: {
TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
- leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
- return leftExec;
+ leftExec = createPlanRecursive(ctx, plan, subQueryNode.getSubQuery());
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
} case SCAN:
leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
- return leftExec;
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, leftExec.getSchema(), leftExec, plan.getParentCount(logicalNode));
+ } else {
+ return leftExec;
+ }
case GROUP_BY:
GroupbyNode grpNode = (GroupbyNode) logicalNode;
- leftExec = createPlanRecursive(ctx, grpNode.getChild());
- return createGroupByPlan(ctx, grpNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(grpNode).get(0));
+ currentExec = createGroupByPlan(ctx, plan, grpNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case SORT:
SortNode sortNode = (SortNode) logicalNode;
- leftExec = createPlanRecursive(ctx, sortNode.getChild());
- return createSortPlan(ctx, sortNode, leftExec);
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(sortNode).get(0));
+ currentExec = createSortPlan(ctx, plan, sortNode, leftExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case JOIN:
JoinNode joinNode = (JoinNode) logicalNode;
- leftExec = createPlanRecursive(ctx, joinNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, joinNode.getRightChild());
- return createJoinPlan(ctx, joinNode, leftExec, rightExec);
+ List<LogicalNode> childs = plan.getChilds(joinNode);
+ leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+ rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+ currentExec = createJoinPlan(ctx, plan, joinNode, leftExec, rightExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case UNION:
UnionNode unionNode = (UnionNode) logicalNode;
- leftExec = createPlanRecursive(ctx, unionNode.getLeftChild());
- rightExec = createPlanRecursive(ctx, unionNode.getRightChild());
- return new UnionExec(ctx, leftExec, rightExec);
+ childs = plan.getChilds(unionNode);
+ leftExec = createPlanRecursive(ctx, plan, childs.get(0));
+ rightExec = createPlanRecursive(ctx, plan, childs.get(1));
+ currentExec = new UnionExec(ctx, leftExec, rightExec);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case LIMIT:
LimitNode limitNode = (LimitNode) logicalNode;
- leftExec = createPlanRecursive(ctx, limitNode.getChild());
- return new LimitExec(ctx, limitNode.getInSchema(),
+ leftExec = createPlanRecursive(ctx, plan, plan.getChilds(limitNode).get(0));
+ currentExec = new LimitExec(ctx, limitNode.getInSchema(),
limitNode.getOutSchema(), leftExec, limitNode);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
case BST_INDEX_SCAN:
IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
- leftExec = createIndexScanExec(ctx, indexScanNode);
- return leftExec;
+ currentExec = createIndexScanExec(ctx, indexScanNode);
+ if (plan.getParentCount(logicalNode) > 1) {
+ return new MultiOutputExec(ctx, currentExec.getSchema(), currentExec, plan.getParentCount(logicalNode));
+ } else {
+ return currentExec;
+ }
default:
return null;
@@ -194,107 +264,108 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return size;
}
- public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
- PhysicalExec rightExec) throws IOException {
+ public PhysicalExec createJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode joinNode,
+ PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
switch (joinNode.getJoinType()) {
case CROSS:
- return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
+ return createCrossJoinPlan(context, plan, joinNode, leftExec, rightExec);
case INNER:
- return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
+ return createInnerJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_OUTER:
- return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_OUTER:
- return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case FULL_OUTER:
- return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
+ return createFullOuterJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_SEMI:
- return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_SEMI:
- return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightSemiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case LEFT_ANTI:
- return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createLeftAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
case RIGHT_ANTI:
- return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
+ return createRightAntiJoinPlan(context, plan, joinNode, leftExec, rightExec);
default:
throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
}
}
- private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + join.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, join, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + join.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, join, leftExec, rightExec);
default:
// fallback algorithm
LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ return new BNLJoinExec(context, join, leftExec, rightExec);
}
} else {
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ return new BNLJoinExec(context, join, leftExec, rightExec);
}
}
- private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, node);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
- return new NLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Nested Loop Join]");
+ return new NLJoinExec(context, node, leftExec, rightExec);
case BLOCK_NESTED_LOOP_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
- return new BNLJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Block Nested Loop Join]");
+ return new BNLJoinExec(context, node, leftExec, rightExec);
case IN_MEMORY_HASH_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
- return new HashJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [In-memory Hash Join]");
+ return new HashJoinExec(context, node, leftExec, rightExec);
case MERGE_JOIN:
- LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ LOG.info("Join (" + node.getPID() +") chooses [Sort Merge Join]");
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
case HYBRID_HASH_JOIN:
default:
LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
}
} else {
- return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+ return createBestInnerJoinPlan(context, plan, node, leftExec, rightExec);
}
}
- private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode node,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ List<LogicalNode> childs = plan.getChilds(node);
+ String [] leftLineage = PlannerUtil.getLineage(plan, childs.get(0));
+ String [] rightLineage = PlannerUtil.getLineage(plan, childs.get(1));
long leftSize = estimateSizeRecursive(context, leftLineage);
long rightSize = estimateSizeRecursive(context, rightLineage);
@@ -318,10 +389,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedOuter = leftExec;
}
- LOG.info("Join (" + plan.getPID() +") chooses [InMemory Hash Join]");
- return new HashJoinExec(context, plan, selectedOuter, selectedInner);
+ LOG.info("Join (" + node.getPID() +") chooses [InMemory Hash Join]");
+ return new HashJoinExec(context, node, selectedOuter, selectedInner);
} else {
- return createMergeInnerJoin(context, plan, leftExec, rightExec);
+ return createMergeInnerJoin(context, node, leftExec, rightExec);
}
}
@@ -340,58 +411,58 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
}
- private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
case NESTED_LOOP_JOIN:
//the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+ return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
- return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+ return createBestLeftOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long rightTableVolume = estimateSizeRecursive(context, rightLineage);
if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
// we can implement left outer join using hash join, using the right operand as the build relation
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
else {
//the right operand is too large, so we opt for NL implementation of left outer join
- LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
- return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Outer Join (" + join.getPID() +") chooses [Nested Loop Join].");
+ return new NLLeftOuterJoinExec(context, join, leftExec, rightExec);
}
}
- private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
//if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
// blocking, but merge join is blocking as well)
- String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+ String [] outerLineage4 = PlannerUtil.getLineage(plan.getChild(join, 0));
long outerSize = estimateSizeRecursive(context, outerLineage4);
if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
- LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
} else {
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
@@ -408,56 +479,56 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
}
- private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Right Outer Join (" + join.getPID() +") chooses [Hash Join].");
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
case MERGE_JOIN:
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createRightOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
- return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+ return createBestRightJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
case MERGE_JOIN:
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
} else {
- return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+ return createBestFullOuterJoinPlan(context, plan, join, leftExec, rightExec);
}
}
- private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec)
throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
@@ -472,8 +543,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
selectedLeft = rightExec;
selectedRight = leftExec;
}
- LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
- return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
+ LOG.info("Full Outer Join (" + join.getPID() +") chooses [Hash Join]");
+ return new HashFullOuterJoinExec(context, join, selectedRight, selectedLeft);
}
private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
@@ -491,117 +562,117 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
}
- private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
- String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
- String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+ String [] leftLineage = PlannerUtil.getLineage(plan.getChild(join, 0));
+ String [] rightLineage = PlannerUtil.getLineage(plan.getChild(join, 1));
long outerSize2 = estimateSizeRecursive(context, leftLineage);
long innerSize2 = estimateSizeRecursive(context, rightLineage);
final long threshold = 1048576 * 128;
if (outerSize2 < threshold || innerSize2 < threshold) {
- return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterHashJoinPlan(context, plan, join, leftExec, rightExec);
} else {
- return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+ return createFullOuterMergeJoinPlan(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
*/
- private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftOuterJoinExec(context, join, leftExec, rightExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftAntiJoinExec(context, join, leftExec, rightExec);
}
}
/**
* Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
*/
- private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+ private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, ExecutionPlan plan, JoinNode join,
PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
Enforcer enforcer = context.getEnforcer();
- EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+ EnforceProperty property = getAlgorithmEnforceProperty(enforcer, join);
if (property != null) {
JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
switch (algorithm) {
case IN_MEMORY_HASH_JOIN:
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
default:
LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
- return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+ return new HashLeftOuterJoinExec(context, join, rightExec, leftExec);
}
} else {
- LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
- return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+ LOG.info("Left Semi Join (" + join.getPID() +") chooses [In Memory Hash Join].");
+ return new HashLeftSemiJoinExec(context, join, rightExec, leftExec);
}
}
@@ -609,7 +680,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
StoreTableNode plan, PhysicalExec subOp) throws IOException {
if (plan.getPartitionType() == PartitionType.HASH_PARTITION
|| plan.getPartitionType() == PartitionType.RANGE_PARTITION) {
- switch (ctx.getDataChannel().getPartitionType()) {
+ DataChannel channel = null;
+ for (DataChannel outChannel : ctx.getOutgoingChannels()) {
+ if (outChannel.getSrcPID() == plan.getPID()) {
+ channel = outChannel;
+ }
+ }
+ switch (channel.getPartitionType()) {
case HASH_PARTITION:
return new PartitionedStoreExec(ctx, sm, plan, subOp);
@@ -620,10 +697,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
if (sortExec != null) {
sortSpecs = sortExec.getSortSpecs();
} else {
- Column[] columns = ctx.getDataChannel().getPartitionKey();
- SortSpec specs[] = new SortSpec[columns.length];
+ Column[] columns = channel.getPartitionKey();
+ sortSpecs= new SortSpec[columns.length];
for (int i = 0; i < columns.length; i++) {
- specs[i] = new SortSpec(columns[i]);
+ sortSpecs[i] = new SortSpec(columns[i]);
}
}
@@ -642,11 +719,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
"Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");
- FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+ FragmentProto[] fragments = ctx.getTables(scanNode.getCanonicalName());
return new SeqScanExec(ctx, sm, scanNode, fragments);
}
- public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+ public PhysicalExec createGroupByPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
+ PhysicalExec subOp)
throws IOException {
Enforcer enforcer = context.getEnforcer();
@@ -659,7 +737,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
return createSortAggregation(context, property, groupbyNode, subOp);
}
}
- return createBestAggregationPlan(context, groupbyNode, subOp);
+ return createBestAggregationPlan(context, plan, groupbyNode, subOp);
}
private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
@@ -678,7 +756,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
if (property != null) {
- List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+ List<SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
int i = 0;
@@ -692,20 +770,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
SortNode sortNode = new SortNode(-1, sortSpecs);
sortNode.setInSchema(subOp.getSchema());
sortNode.setOutSchema(subOp.getSchema());
- // SortExec sortExec = new SortExec(sortNode, child);
ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
return new SortAggregateExec(ctx, groupbyNode, sortExec);
}
- private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+ private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, ExecutionPlan plan, GroupbyNode groupbyNode,
PhysicalExec subOp) throws IOException {
Column[] grpColumns = groupbyNode.getGroupingColumns();
if (grpColumns.length == 0) {
return createInMemoryHashAggregation(context, groupbyNode, subOp);
}
- String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+ String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(groupbyNode).get(0));
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
@@ -719,7 +796,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+ public PhysicalExec createSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
Enforcer enforcer = context.getEnforcer();
EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
@@ -732,16 +809,16 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
}
}
- return createBestSortPlan(context, sortNode, child);
+ return createBestSortPlan(context, plan, sortNode, child);
}
- public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+ public SortExec createBestSortPlan(TaskAttemptContext context, ExecutionPlan plan, SortNode sortNode,
PhysicalExec child) throws IOException {
- String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+ String [] outerLineage = PlannerUtil.getLineage(plan, plan.getChilds(sortNode).get(0));
long estimatedSize = estimateSizeRecursive(context, outerLineage);
final long threshold = 1048576 * 2000;
- // if the relation size is less than the reshold,
+ // if the relation size is less than thereshold,
// the in-memory sort will be used.
if (estimatedSize <= threshold) {
return new MemSortExec(context, sortNode, child);
@@ -757,17 +834,19 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
"Error: There is no table matched to %s", annotation.getCanonicalName());
- FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+ FragmentProto[] fragmentProtos = ctx.getTables(annotation.getTableName());
List<FileFragment> fragments =
FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, fragmentProtos);
- String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
+ String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0),
+ annotation.getSortKeys());
Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
annotation.getSortKeys());
- return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
- annotation.getKeySchema(), comp, annotation.getDatum());
+ return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(
+ indexPath, indexName), annotation.getKeySchema(), comp,
+ annotation.getDatum());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 9371463..3b8bd08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,6 +29,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.global.ExecutionPlan;
import org.apache.tajo.engine.planner.logical.*;
import org.apache.tajo.engine.exception.InvalidQueryException;
import org.apache.tajo.storage.TupleComparator;
@@ -61,6 +62,17 @@ public class PlannerUtil {
}
return tableNames;
}
+
+ public static String [] getLineage(ExecutionPlan plan, LogicalNode node) {
+ LogicalNode [] scans = PlannerUtil.findAllNodes(plan, node, NodeType.SCAN);
+ String [] tableNames = new String[scans.length];
+ ScanNode scan;
+ for (int i = 0; i < scans.length; i++) {
+ scan = (ScanNode) scans[i];
+ tableNames[i] = scan.getCanonicalName();
+ }
+ return tableNames;
+ }
/**
* Delete the logical node from a plan.
@@ -151,6 +163,97 @@ public class PlannerUtil {
parentNode.setChild(newNode);
}
+ public static GroupbyNode[] transformGroupbyTo2Pv2(LogicalPlan plan, GroupbyNode groupBy) {
+ Preconditions.checkNotNull(groupBy);
+
+ GroupbyNode parent = null, child = null;
+
+ // cloning groupby node
+ try {
+ parent = groupBy;
+ child = (GroupbyNode) groupBy.clone();
+ child.setPid(plan.newPID());
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
+
+ List<Target> firstStepTargets = Lists.newArrayList();
+ Target[] secondTargets = parent.getTargets();
+ Target[] firstTargets = child.getTargets();
+
+ Target second;
+ Target first;
+ int targetId = 0;
+ for (int i = 0; i < firstTargets.length; i++) {
+ second = secondTargets[i];
+ first = firstTargets[i];
+
+ List<AggregationFunctionCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
+ List<AggregationFunctionCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+
+ if (firstStepFunctions.size() == 0) {
+ firstStepTargets.add(first);
+ targetId++;
+ } else {
+ for (AggregationFunctionCallEval func : firstStepFunctions) {
+ Target newTarget;
+
+ if (func.isDistinct()) {
+ List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
+ newTarget = new Target(new FieldEval(fields.get(0)));
+ String targetName = "column_" + (targetId++);
+ newTarget.setAlias(targetName);
+
+ AggregationFunctionCallEval secondFunc = null;
+ for (AggregationFunctionCallEval sf : secondStepFunctions) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column(targetName, newTarget.getEvalTree().getValueType()))});
+ } else {
+ func.setFirstPhase();
+ newTarget = new Target(func);
+ String targetName = "column_" + (targetId++);
+ newTarget.setAlias(targetName);
+
+ AggregationFunctionCallEval secondFunc = null;
+ for (AggregationFunctionCallEval sf : secondStepFunctions) {
+ if (func.equals(sf)) {
+ secondFunc = sf;
+ break;
+ }
+ }
+ secondFunc.setArgs(new EvalNode [] {new FieldEval(
+ new Column(targetName, newTarget.getEvalTree().getValueType()))});
+ }
+ firstStepTargets.add(newTarget);
+ }
+ }
+
+ // Getting new target list and updating input/output schema from the new target list.
+ Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
+ Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
+ List<Target> newTarget = Lists.newArrayList();
+ for (Column column : parent.getGroupingColumns()) {
+ if (!targetSchema.contains(column.getQualifiedName())) {
+ newTarget.add(new Target(new FieldEval(column)));
+ }
+ }
+ targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
+
+ child.setTargets(targetArray);
+ child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
+ // set the groupby chaining
+ groupBy.setInSchema(child.getOutSchema());
+
+ }
+ return new GroupbyNode[] {parent, child};
+ }
+
public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
Preconditions.checkNotNull(groupBy);
@@ -241,6 +344,21 @@ public class PlannerUtil {
return child;
}
+ public static SortNode[] transformSortTo2p(LogicalPlan plan, SortNode sort) {
+ Preconditions.checkArgument(sort != null);
+ SortNode parent = null, child = null;
+ try {
+ parent = sort;
+ child = (SortNode) sort.clone();
+ child.setPid(plan.newPID());
+ } catch (CloneNotSupportedException e) {
+ LOG.warn(e);
+ }
+
+ parent.setInSchema(child.getOutSchema());
+
+ return new SortNode[]{parent, child};
+ }
/**
* Find the top logical node matched to type from the given node
@@ -262,6 +380,88 @@ public class PlannerUtil {
return (T) finder.getFoundNodes().get(0);
}
+ private static class LogicalNodeFinderForExecPlan {
+ private NodeType type;
+ private ExecutionPlan plan;
+ private LogicalNode node;
+ private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+ public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan, LogicalNode node) {
+ this.type = type;
+ this.plan = plan;
+ this.node = node;
+ }
+
+ public LogicalNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+ this(type, plan, plan.getTerminalNode());
+ }
+
+ public void find() {
+ this.visit(node);
+ }
+
+ private void visit(LogicalNode node) {
+ if (plan.getChildCount(node) > 0) {
+ for (LogicalNode child : plan.getChilds(node)) {
+ this.visit(child);
+ }
+ }
+
+ if (node.getType() == type) {
+ foundNodes.add(node);
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return foundNodes;
+ }
+ }
+
+ private static class ParentNodeFinderForExecPlan {
+ private NodeType type;
+ private ExecutionPlan plan;
+ private List<LogicalNode> foundNodes = Lists.newArrayList();
+
+ public ParentNodeFinderForExecPlan(NodeType type, ExecutionPlan plan) {
+ this.type = type;
+ this.plan = plan;
+ }
+
+ public void find() {
+ this.visit(plan.getTerminalNode());
+ }
+
+ private void visit(LogicalNode node) {
+ if (plan.getChildCount(node) > 0) {
+ for (LogicalNode child : plan.getChilds(node)) {
+ this.visit(child);
+ }
+ for (LogicalNode child : plan.getChilds(node)) {
+ if (child.getType() == type) {
+ foundNodes.add(child);
+ }
+ }
+ }
+ }
+
+ public List<LogicalNode> getFoundNodes() {
+ return foundNodes;
+ }
+ }
+
+ public static <T extends LogicalNode> T findTopNode(ExecutionPlan executionPlan, NodeType type) {
+ Preconditions.checkNotNull(executionPlan);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, executionPlan);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return (T) finder.getFoundNodes().get(0);
+ }
+
/**
* Find the all logical node matched to type from the given node
*
@@ -282,6 +482,20 @@ public class PlannerUtil {
List<LogicalNode> founds = finder.getFoundNodes();
return founds.toArray(new LogicalNode[founds.size()]);
}
+
+ public static LogicalNode [] findAllNodes(ExecutionPlan plan, LogicalNode node, NodeType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ LogicalNodeFinderForExecPlan finder = new LogicalNodeFinderForExecPlan(type, plan, node);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return new LogicalNode[] {};
+ }
+ List<LogicalNode> founds = finder.getFoundNodes();
+ return founds.toArray(new LogicalNode[founds.size()]);
+ }
/**
* Find a parent node of a given-typed operator.
@@ -303,6 +517,19 @@ public class PlannerUtil {
return (T) finder.getFoundNodes().get(0);
}
+ public static <T extends LogicalNode> T findTopParentNode(ExecutionPlan node, NodeType type) {
+ Preconditions.checkNotNull(node);
+ Preconditions.checkNotNull(type);
+
+ ParentNodeFinderForExecPlan finder = new ParentNodeFinderForExecPlan(type, node);
+ finder.find();
+
+ if (finder.getFoundNodes().size() == 0) {
+ return null;
+ }
+ return (T) finder.getFoundNodes().get(0);
+ }
+
public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 0401718..26cedd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -38,23 +38,31 @@ public class DataChannel {
private StoreType storeType = StoreType.CSV;
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId) {
+ private Integer srcPID;
+ private Integer targetPID;
+
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID) {
this.srcId = srcId;
this.targetId = targetId;
+ this.srcPID = srcPID;
+ this.targetPID = targetPID;
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType) {
- this(srcId, targetId);
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+ PartitionType partitionType) {
+ this(srcId, targetId, srcPID, targetPID);
this.partitionType = partitionType;
}
- public DataChannel(ExecutionBlock src, ExecutionBlock target, PartitionType partitionType, int partNum) {
- this(src.getId(), target.getId(), partitionType, partNum);
- setSchema(src.getPlan().getOutSchema());
+ public DataChannel(ExecutionBlock src, ExecutionBlock target, Integer srcPID, Integer targetPID,
+ PartitionType partitionType, int partNum, Schema schema) {
+ this(src.getId(), target.getId(), srcPID, targetPID, partitionType, partNum);
+ setSchema(schema);
}
- public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, PartitionType partitionType, int partNum) {
- this(srcId, targetId, partitionType);
+ public DataChannel(ExecutionBlockId srcId, ExecutionBlockId targetId, Integer srcPID, Integer targetPID,
+ PartitionType partitionType, int partNum) {
+ this(srcId, targetId, srcPID, targetPID, partitionType);
this.partitionNum = partNum;
}
@@ -77,6 +85,12 @@ public class DataChannel {
if (proto.hasPartitionNum()) {
this.partitionNum = proto.getPartitionNum();
}
+ if (proto.hasSrcPID()) {
+ this.srcPID = proto.getSrcPID();
+ }
+ if (proto.hasTargetPID()) {
+ this.targetPID = proto.getTargetPID();
+ }
}
public ExecutionBlockId getSrcId() {
@@ -163,6 +177,12 @@ public class DataChannel {
if (partitionNum != null) {
builder.setPartitionNum(partitionNum);
}
+ if (srcPID != null) {
+ builder.setSrcPID(srcPID);
+ }
+ if (targetPID != null) {
+ builder.setTargetPID(targetPID);
+ }
return builder.build();
}
@@ -177,7 +197,7 @@ public class DataChannel {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[").append(srcId.getQueryId()).append("] ");
- sb.append(srcId.getId()).append(" => ").append(targetId.getId());
+ sb.append(srcId.getId()).append("."+srcPID).append(" => ").append(targetId.getId()).append("."+targetPID);
sb.append(" (type=").append(partitionType);
if (hasPartitionKey()) {
sb.append(", key=");
@@ -195,4 +215,16 @@ public class DataChannel {
sb.append(")");
return sb.toString();
}
+
+ public void updateSrcPID(int srcPID) {
+ this.srcPID = srcPID;
+ }
+
+ public Integer getSrcPID() {
+ return srcPID;
+ }
+
+ public Integer getTargetPID() {
+ return targetPID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
new file mode 100644
index 0000000..3c2152c
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DestinationContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.SortNode;
+
+public class DestinationContext {
+ @Expose private String destTableName;
+ @Expose private StoreType storeType = StoreType.CSV;
+ @Expose private NodeType terminalNodeType;
+ @Expose private Schema outputSchema;
+ @Expose private Column[] groupingColumns;
+ @Expose private SortSpec[] sortSpecs;
+
+ public DestinationContext() {
+
+ }
+
+ public DestinationContext(LogicalNode node) {
+ this.set(node);
+ }
+
+ public void set(LogicalNode node) {
+ terminalNodeType = node.getType();
+ outputSchema = node.getOutSchema();
+ if (terminalNodeType.equals(NodeType.GROUP_BY)) {
+ groupingColumns = ((GroupbyNode)node).getGroupingColumns();
+ } else if (terminalNodeType.equals(NodeType.SORT)) {
+ sortSpecs = ((SortNode)node).getSortKeys();
+ }
+ }
+
+ public NodeType getTerminalNodeType() {
+ return terminalNodeType;
+ }
+
+ public void setTerminalNodeType(NodeType terminalNodeType) {
+ this.terminalNodeType = terminalNodeType;
+ }
+
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+
+ public void setOutputSchema(Schema outputSchema) {
+ this.outputSchema = outputSchema;
+ }
+
+ public Column [] getGroupingColumns() {
+ return groupingColumns;
+ }
+
+ public void setGroupingColumns(Column [] groupingColumns) {
+ this.groupingColumns = groupingColumns;
+ }
+
+ public SortSpec[] getSortSpecs() {
+ return sortSpecs;
+ }
+
+ public void setSortSpecs(SortSpec[] sortSpecs) {
+ this.sortSpecs = sortSpecs;
+ }
+
+ public StoreType getStoreType() {
+ return storeType;
+ }
+
+ public void setStoreType(StoreType storeType) {
+ this.storeType = storeType;
+ }
+
+ public String getDestTableName() {
+ return destTableName;
+ }
+
+ public void setDestTableName(String destTableName) {
+ this.destTableName = destTableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7c97735e/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index efa1c7f..7cfa478 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -14,12 +14,15 @@
package org.apache.tajo.engine.planner.global;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@ -30,16 +33,17 @@ import java.util.*;
*/
public class ExecutionBlock {
private ExecutionBlockId executionBlockId;
- private LogicalNode plan = null;
- private StoreTableNode store = null;
- private List<ScanNode> scanlist = new ArrayList<ScanNode>();
+ private ExecutionPlan executionPlan;
private Enforcer enforcer = new Enforcer();
- private boolean hasJoinPlan;
- private boolean hasUnionPlan;
-
private Set<String> broadcasted = new HashSet<String>();
+ public ExecutionBlock(ExecutionBlockId executionBlockId, LogicalRootNode rootNode) {
+ this.executionBlockId = executionBlockId;
+ this.executionPlan = new ExecutionPlan(rootNode);
+ }
+
+ @VisibleForTesting
public ExecutionBlock(ExecutionBlockId executionBlockId) {
this.executionBlockId = executionBlockId;
}
@@ -49,64 +53,27 @@ public class ExecutionBlock {
}
public void setPlan(LogicalNode plan) {
- hasJoinPlan = false;
- hasUnionPlan = false;
- this.scanlist.clear();
- this.plan = plan;
-
- LogicalNode node = plan;
- ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
- s.add(node);
- while (!s.isEmpty()) {
- node = s.remove(s.size()-1);
- if (node instanceof UnaryNode) {
- UnaryNode unary = (UnaryNode) node;
- s.add(s.size(), unary.getChild());
- } else if (node instanceof BinaryNode) {
- BinaryNode binary = (BinaryNode) node;
- if (binary.getType() == NodeType.JOIN) {
- hasJoinPlan = true;
- } else if (binary.getType() == NodeType.UNION) {
- hasUnionPlan = true;
- }
- s.add(s.size(), binary.getLeftChild());
- s.add(s.size(), binary.getRightChild());
- } else if (node instanceof ScanNode) {
- scanlist.add((ScanNode)node);
- } else if (node instanceof TableSubQueryNode) {
- TableSubQueryNode subQuery = (TableSubQueryNode) node;
- s.add(s.size(), subQuery.getSubQuery());
- }
- }
+ executionPlan.setPlan(plan);
}
-
- public LogicalNode getPlan() {
- return plan;
+ public ExecutionPlan getPlan() {
+ return executionPlan;
}
public Enforcer getEnforcer() {
return enforcer;
}
- public StoreTableNode getStoreTableNode() {
- return store;
- }
-
- public ScanNode [] getScanNodes() {
- return this.scanlist.toArray(new ScanNode[scanlist.size()]);
- }
-
- public Schema getOutputSchema() {
- return store.getOutSchema();
+ public InputContext getInputContext() {
+ return executionPlan.getInputContext();
}
public boolean hasJoin() {
- return hasJoinPlan;
+ return executionPlan.hasJoinPlan();
}
public boolean hasUnion() {
- return hasUnionPlan;
+ return executionPlan.hasUnionPlan();
}
public void addBroadcastTables(Collection<String> tableNames) {