You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:32 UTC
[5/7] TAJO-184: Refactor GlobalPlanner and global plan data
structure. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
index e053c3c..e754a7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
public final class SortNode extends UnaryNode implements Cloneable {
@@ -69,7 +70,22 @@ public final class SortNode extends UnaryNode implements Cloneable {
return sort;
}
-
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Sort");
+ StringBuilder sb = new StringBuilder("Sort Keys: ");
+ for (int i = 0; i < sortKeys.length; i++) {
+ sb.append(sortKeys[i].getSortKey().getColumnName()).append(" ")
+ .append(sortKeys[i].isAscending() ? "asc" : "desc");
+ if( i < sortKeys.length - 1) {
+ sb.append(",");
+ }
+ }
+ planStr.addExplan(sb.toString());
+ return planStr;
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder("Sort [key= ");
for (int i = 0; i < sortKeys.length; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 817ca35..a2dd097 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,10 +22,12 @@ import com.google.common.base.Preconditions;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Options;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.engine.planner.PlanString;
import org.apache.tajo.util.TUtil;
import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.LIST_PARTITION;
public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private String tableName;
@@ -33,7 +35,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
@Expose private PartitionType partitionType;
@Expose private int numPartitions;
@Expose private Column [] partitionKeys;
- @Expose private boolean local;
@Expose private Options options;
@Expose private boolean isCreatedTable = false;
@Expose private boolean isOverwritten = false;
@@ -41,7 +42,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
public StoreTableNode(String tableName) {
super(NodeType.STORE);
this.tableName = tableName;
- this.local = false;
}
public final String getTableName() {
@@ -59,14 +59,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
public StoreType getStorageType() {
return this.storageType;
}
-
- public final void setLocal(boolean local) {
- this.local = local;
- }
-
- public final boolean isLocal() {
- return this.local;
- }
public final int getNumPartitions() {
return this.numPartitions;
@@ -80,10 +72,10 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.partitionKeys;
}
- public final void setListPartition() {
- this.partitionType = PartitionType.LIST;
+ public final void setDefaultParition() {
+ this.partitionType = LIST_PARTITION;
this.partitionKeys = null;
- this.numPartitions = 0;
+ this.numPartitions = 1;
}
public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
@@ -113,6 +105,16 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
return this.options;
}
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Store");
+ planStr.appendTitle(" into ").appendTitle(tableName);
+ planStr.addExplan("Store type: " + storageType);
+
+ return planStr;
+ }
+
public boolean isCreatedTable() {
return isCreatedTable;
}
@@ -152,7 +154,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
store.isOverwritten = isOverwritten;
return store;
}
-
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\"Store\": {\"table\": \""+tableName);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
new file mode 100644
index 0000000..8ad8cf5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+
+public class TableSubQueryNode extends RelationNode implements Projectable {
+ @Expose private String tableName;
+ @Expose private LogicalNode subQuery;
+ @Expose private Target [] targets; // unused
+
+ public TableSubQueryNode(String tableName, LogicalNode subQuery) {
+ super(NodeType.TABLE_SUBQUERY);
+ this.tableName = tableName;
+ this.subQuery = subQuery;
+ setOutSchema(PlannerUtil.getQualifiedSchema(this.subQuery.getOutSchema(), tableName));
+ setInSchema(this.subQuery.getInSchema());
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getCanonicalName() {
+ return tableName;
+ }
+
+ @Override
+ public Schema getTableSchema() {
+ return getOutSchema();
+ }
+
+ public void setSubQuery(LogicalNode node) {
+ this.subQuery = node;
+ setInSchema(subQuery.getInSchema());
+ }
+
+ public LogicalNode getSubQuery() {
+ return subQuery;
+ }
+
+ @Override
+ public boolean hasTargets() {
+ return targets != null;
+ }
+
+ @Override
+ public void setTargets(Target[] targets) {
+ this.targets = targets;
+ }
+
+ @Override
+ public Target[] getTargets() {
+ return targets;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("TableSubQuery");
+ planStr.appendTitle(" as ").appendTitle(tableName);
+ return planStr;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableName, subQuery);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof TableSubQueryNode) {
+ TableSubQueryNode another = (TableSubQueryNode) object;
+ return tableName.equals(another.tableName) && subQuery.equals(another.subQuery);
+ }
+
+ return false;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ TableSubQueryNode newTableSubQueryNode = (TableSubQueryNode) super.clone();
+ newTableSubQueryNode.tableName = tableName;
+ return newTableSubQueryNode;
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ subQuery.preOrder(visitor);
+ }
+
+ @Override
+ public void postOrder(LogicalNodeVisitor visitor) {
+ subQuery.preOrder(visitor);
+ visitor.visit(this);
+ }
+
+ public String toString() {
+ return "Table Subquery (alias = " + tableName + ")\n" + subQuery.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 4380996..7f6e065 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -16,9 +16,6 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.planner.logical;
import com.google.gson.annotations.Expose;
@@ -42,8 +39,8 @@ public abstract class UnaryNode extends LogicalNode implements Cloneable {
this.child = subNode;
}
- public LogicalNode getChild() {
- return this.child;
+ public <T extends LogicalNode> T getChild() {
+ return (T) this.child;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
index c0aafbd..a62e91b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -21,6 +21,8 @@
*/
package org.apache.tajo.engine.planner.logical;
+import org.apache.tajo.engine.planner.PlanString;
+
public class UnionNode extends BinaryNode {
public UnionNode() {
@@ -33,6 +35,16 @@ public class UnionNode extends BinaryNode {
setRightChild(inner);
}
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString("Union");
+ planStr.appendTitle(" (L - " + ((TableSubQueryNode)getLeftChild()).getTableName());
+ planStr.appendTitle(", R - " + ((TableSubQueryNode)getRightChild()).getTableName());
+ planStr.appendTitle(")");
+ return planStr;
+ }
+
public String toString() {
return getLeftChild().toString() + "\n UNION \n" + getRightChild().toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
index d76af7d..2ec055a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
@@ -37,8 +37,8 @@ public class JoinTree {
List<Column> left = EvalTreeUtil.findAllColumnRefs(node.getLeftExpr());
List<Column> right = EvalTreeUtil.findAllColumnRefs(node.getRightExpr());
- String ltbName = left.get(0).getTableName();
- String rtbName = right.get(0).getTableName();
+ String ltbName = left.get(0).getQualifier();
+ String rtbName = right.get(0).getQualifier();
Edge l2r = new Edge(ltbName, rtbName, node);
Edge r2l = new Edge(rtbName, ltbName, node);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
new file mode 100644
index 0000000..fc569d7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+ @Override
+ public RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+
+ if (exec instanceof SeqScanExec) {
+ return visitSeqScan((SeqScanExec) exec, stack, context);
+ } else if (exec instanceof SelectionExec) {
+ return visitSelection((SelectionExec) exec, stack, context);
+ } else if (exec instanceof SortExec) {
+ return visitSort((SortExec) exec, stack, context);
+ } else if (exec instanceof SortAggregateExec) {
+ return visitSortAggregation((SortAggregateExec) exec, stack, context);
+ } else if (exec instanceof ProjectionExec) {
+ return visitProjection((ProjectionExec) exec, stack, context);
+ } else if (exec instanceof HashJoinExec) {
+ return visitHashJoin((HashJoinExec) exec, stack, context);
+ } else if (exec instanceof HashAntiJoinExec) {
+ return visitHashAntiJoin((HashAntiJoinExec) exec, stack, context);
+ } else if (exec instanceof HashSemiJoinExec) {
+ return visitHashSemiJoin((HashSemiJoinExec) exec, stack, context);
+ } else if (exec instanceof LimitExec) {
+ return visitLimit((LimitExec) exec, stack, context);
+ } else {
+ throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+ }
+ }
+
+ private RESULT visitUnaryExecutor(UnaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ stack.push(exec);
+ RESULT r = visitChild(exec.getChild(), stack, context);
+ stack.pop();
+ return r;
+ }
+
+ private RESULT visitBinaryExecutor(BinaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ stack.push(exec);
+ RESULT r = visitChild(exec.getLeftChild(), stack, context);
+ visitChild(exec.getRightChild(), stack, context);
+ stack.pop();
+ return r;
+ }
+
+ @Override
+ public RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context) {
+ return null;
+ }
+
+ @Override
+ public RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(exec, stack, context);
+ }
+
+ @Override
+ public RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(exec, stack, context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
new file mode 100644
index 0000000..131fbe5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.physical.*;
+
+import java.util.Stack;
+
+public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
+ RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context);
+ RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+ RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
new file mode 100644
index 0000000..bd773ed
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class PhysicalPlanUtil {
+ public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
+ throws PhysicalPlanningException {
+ return (T) new FindVisitor().visitChild(plan, new Stack<PhysicalExec>(), clazz);
+ }
+
+ private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
+ public PhysicalExec visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+ throws PhysicalPlanningException {
+
+ if (target.isAssignableFrom(exec.getClass())) {
+ return exec;
+ } else {
+ return super.visitChild(exec, stack, target);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
new file mode 100644
index 0000000..0d7554d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+ public PhysicalPlanningException(String message) {
+ super(message);
+ }
+
+ public PhysicalPlanningException(IOException ioe) {
+ super(ioe);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 9051219..2d736ce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -136,6 +136,6 @@ public class SeqScanExec extends PhysicalExec {
}
public String getTableName() {
- return plan.getTableId();
+ return plan.getTableName();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 1f771e6..a33cbd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -43,8 +43,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
@Override
public boolean isEligible(LogicalPlan plan) {
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
- LogicalNode toBeOptimized = block.getRoot();
- if (PlannerUtil.findTopNode(toBeOptimized, NodeType.SELECTION) != null) {
+ if (PlannerUtil.findTopNode(block.getRoot(), NodeType.SELECTION) != null) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 4c2f9d7..1484f32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -49,7 +49,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
public boolean isEligible(LogicalPlan plan) {
LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
- if (PlannerUtil.checkIfDDLPlan(toBeOptimized) && !plan.getRootBlock().hasTableExpression()) {
+ if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
LOG.info("This query skips the logical optimization step.");
return false;
}
@@ -59,25 +59,29 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
@Override
public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
- for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
- NodeType nodeType = block.getRootType();
- // skip a non-table-expression block.
- if (!(nodeType == NodeType.INSERT || nodeType == NodeType.CREATE_TABLE || nodeType == NodeType.EXPRS)) {
- Stack<LogicalNode> stack = new Stack<LogicalNode>();
- PushDownContext context = new PushDownContext(block);
- context.plan = plan;
- if (block.getProjection() != null &&
- block.getProjection().isAllProjected()) {
- context.targetListManager = new TargetListManager(plan,
- block.getProjectionNode().getTargets());
- } else {
- context.targetListManager= new TargetListManager(plan, block.getName());
- }
- context.upperRequired = new HashSet<Column>(block.getSchema().getColumns());
- visitChild(plan, block.getRoot(), stack, context);
- }
+ LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+
+ LogicalPlan.QueryBlock topmostBlock;
+
+ // skip a non-table-expression block.
+ if (plan.getRootBlock().getRootType() == NodeType.INSERT) {
+ topmostBlock = plan.getChildBlocks(rootBlock).iterator().next();
+ } else {
+ topmostBlock = rootBlock;
}
+ Stack<LogicalNode> stack = new Stack<LogicalNode>();
+ PushDownContext context = new PushDownContext(topmostBlock);
+ context.plan = plan;
+
+ if (topmostBlock.getProjection() != null && topmostBlock.getProjection().isAllProjected()) {
+ context.targetListManager = new TargetListManager(plan, topmostBlock.getProjectionNode().getTargets());
+ } else {
+ context.targetListManager= new TargetListManager(plan, topmostBlock.getName());
+ }
+
+ visitChild(plan, topmostBlock.getRoot(), stack, context);
+
return plan;
}
@@ -113,8 +117,24 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
@Override
public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack,
PushDownContext context) throws PlanningException {
- for (Target target : node.getTargets()) {
- context.upperRequired.add(target.getColumnSchema());
+ if (context.upperRequired == null) { // all projected
+ context.upperRequired = new HashSet<Column>();
+ for (Target target : node.getTargets()) {
+ context.upperRequired.add(target.getColumnSchema());
+ }
+ } else {
+ List<Target> projectedTarget = new ArrayList<Target>();
+ for (Target target : node.getTargets()) {
+ if (context.upperRequired.contains(target.getColumnSchema())) {
+ projectedTarget.add(target);
+ }
+ }
+ node.setTargets(projectedTarget.toArray(new Target[projectedTarget.size()]));
+
+ context.upperRequired = new HashSet<Column>();
+ for (Target target : node.getTargets()) {
+ context.upperRequired.add(target.getColumnSchema());
+ }
}
stack.push(node);
@@ -126,13 +146,14 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
// If all expressions are evaluated in the child operators and the last operator is projectable,
// ProjectionNode will not be necessary. It eliminates ProjectionNode.
if (context.targetListManager.isAllEvaluated() && (childNode instanceof Projectable)) {
+ child.setOutSchema(context.targetListManager.getUpdatedSchema());
if (stack.isEmpty()) {
// update the child node's output schemas
- child.setOutSchema(context.targetListManager.getUpdatedSchema());
context.queryBlock.setRoot(child);
+ } else if (stack.peek().getType() == NodeType.TABLE_SUBQUERY) {
+ ((TableSubQueryNode)stack.peek()).setSubQuery(childNode);
} else {
LogicalNode parent = stack.peek();
- child.setOutSchema(context.targetListManager.getUpdatedSchema());
PlannerUtil.deleteNode(parent, node);
}
return child;
@@ -242,6 +263,42 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
}
@Override
+ public LogicalNode visitTableSubQuery(LogicalPlan plan, TableSubQueryNode node, Stack<LogicalNode> stack,
+ PushDownContext context) throws PlanningException {
+ LogicalPlan.QueryBlock subBlock = plan.getBlock(node.getSubQuery());
+ LogicalNode subRoot = subBlock.getRoot();
+
+ Stack<LogicalNode> newStack = new Stack<LogicalNode>();
+ newStack.push(node);
+ PushDownContext newContext = new PushDownContext(subBlock);
+ if (subBlock.getProjection() != null && subBlock.getProjection().isAllProjected()
+ && context.upperRequired.size() == 0) {
+ newContext.targetListManager = new TargetListManager(plan, subBlock.getProjectionNode().getTargets());
+ } else {
+ List<Target> projectedTarget = new ArrayList<Target>();
+ for (Target target : subBlock.getTargetListManager().getUnEvaluatedTargets()) {
+ for (Column column : context.upperRequired) {
+ if (column.hasQualifier() && !node.getTableName().equals(column.getQualifier())) {
+ continue;
+ }
+ if (target.getColumnSchema().getColumnName().equalsIgnoreCase(column.getColumnName())) {
+ projectedTarget.add(target);
+ }
+ }
+ }
+ newContext.targetListManager = new TargetListManager(plan, projectedTarget.toArray(new Target[projectedTarget.size()]));
+ }
+
+ newContext.upperRequired = new HashSet<Column>();
+ newContext.upperRequired.addAll(PlannerUtil.targetToSchema(newContext.targetListManager.getTargets()).getColumns());
+
+ LogicalNode child = visitChild(plan, subRoot, newStack, newContext);
+ newStack.pop();
+ node.setInSchema(PlannerUtil.getQualifiedSchema(child.getOutSchema(), node.getTableName()));
+ return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
+ }
+
+ @Override
public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, PushDownContext context)
throws PlanningException {
return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
@@ -280,7 +337,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
if (!targetListManager.isEvaluated(i) && PlannerUtil.canBeEvaluated(expr, node)) {
- if (node instanceof ScanNode) { // For ScanNode
+ if (node instanceof RelationNode) { // For ScanNode
if (expr.getType() == EvalType.FIELD && !targetListManager.getTarget(i).hasAlias()) {
targetListManager.setEvaluated(i);
@@ -306,7 +363,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
Projectable projectable = (Projectable) node;
if (last) {
- Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated.");
+ Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated");
projectable.setTargets(targetListManager.getTargets());
targetListManager.getUpdatedTarget();
node.setOutSchema(targetListManager.getUpdatedSchema());
@@ -361,18 +418,38 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
return true;
}
+ private TargetListManager buildSubBlockTargetList(LogicalPlan plan,
+ LogicalPlan.QueryBlock subQueryBlock, TableSubQueryNode subQueryNode, Set<Column> upperRequired) {
+ TargetListManager subBlockTargetList;
+ List<Target> projectedTarget = new ArrayList<Target>();
+ for (Target target : subQueryBlock.getTargetListManager().getUnEvaluatedTargets()) {
+ for (Column column : upperRequired) {
+ if (!subQueryNode.getTableName().equals(column.getQualifier())) {
+ continue;
+ }
+ if (target.getColumnSchema().getColumnName().equalsIgnoreCase(column.getColumnName())) {
+ projectedTarget.add(target);
+ }
+ }
+ }
+ subBlockTargetList = new TargetListManager(plan, projectedTarget.toArray(new Target[projectedTarget.size()]));
+ return subBlockTargetList;
+ }
+
private BinaryNode pushDownSetNode(LogicalPlan plan, BinaryNode setNode, Stack<LogicalNode> stack,
PushDownContext context) throws PlanningException {
- LogicalPlan.QueryBlock leftBlock = plan.getBlock(setNode.getLeftChild());
+ LogicalPlan.QueryBlock currentBlock = plan.getBlock(setNode);
+ LogicalPlan.QueryBlock leftBlock = plan.getChildBlocks(currentBlock).get(0);
+ LogicalPlan.QueryBlock rightBlock = plan.getChildBlocks(currentBlock).get(1);
+
PushDownContext leftContext = new PushDownContext(context, leftBlock);
- leftContext.targetListManager = new TargetListManager(plan,
- leftBlock.getTargetListManager().getUnEvaluatedTargets());
+ leftContext.targetListManager = buildSubBlockTargetList(plan, leftBlock,
+ (TableSubQueryNode) setNode.getLeftChild(), context.upperRequired);
- LogicalPlan.QueryBlock rightBlock = plan.getBlock(setNode.getRightChild());
PushDownContext rightContext = new PushDownContext(context, rightBlock);
- rightContext.targetListManager = new TargetListManager(plan,
- rightBlock.getTargetListManager().getUnEvaluatedTargets());
+ rightContext.targetListManager = buildSubBlockTargetList(plan, rightBlock,
+ (TableSubQueryNode) setNode.getRightChild(), context.upperRequired);
stack.push(setNode);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index a0422af..0676277 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -18,6 +18,7 @@
package org.apache.tajo.engine.query;
+import org.apache.tajo.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
@@ -42,6 +43,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
private List<Fetch> fetches;
private Boolean shouldDie;
private QueryContext queryContext;
+ private DataChannel dataChannel;
private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
private QueryUnitRequestProto.Builder builder = null;
@@ -55,9 +57,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext) {
+ String serializedData, QueryContext queryContext, DataChannel channel) {
this();
- this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext);
+ this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel);
}
public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
@@ -68,8 +70,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
}
public void set(QueryUnitAttemptId id, List<Fragment> fragments,
- String outputTable, boolean clusteredOutput,
- String serializedData, QueryContext queryContext) {
+ String outputTable, boolean clusteredOutput,
+ String serializedData, QueryContext queryContext, DataChannel dataChannel) {
this.id = id;
this.fragments = fragments;
this.outputTable = outputTable;
@@ -77,6 +79,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
this.serializedData = serializedData;
this.isUpdated = true;
this.queryContext = queryContext;
+ this.queryContext = queryContext;
+ this.dataChannel = dataChannel;
}
@Override
@@ -198,6 +202,24 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
maybeInitBuilder();
this.queryContext = queryContext;
}
+
+ public void setDataChannel(DataChannel dataChannel) {
+ maybeInitBuilder();
+ this.dataChannel = dataChannel;
+ }
+
+ @Override
+ public DataChannel getDataChannel() {
+ QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (dataChannel != null) {
+ return dataChannel;
+ }
+ if (!p.hasQueryContext()) {
+ return null;
+ }
+ this.dataChannel = new DataChannel(p.getDataChannel());
+ return this.dataChannel;
+ }
public List<Fetch> getFetches() {
initFetches();
@@ -272,6 +294,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
if (this.queryContext != null) {
builder.setQueryContext(queryContext.getProto());
}
+ if (this.dataChannel != null) {
+ builder.setDataChannel(dataChannel.getProto());
+ }
}
private void mergeLocalToProto() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
index 9ebd158..7a9c4e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
@@ -179,7 +179,7 @@ public class ResultSetMetaDataImpl implements ResultSetMetaData {
*/
@Override
public String getTableName(int column) throws SQLException {
- return meta.getSchema().getColumn(column - 1).getTableName();
+ return meta.getSchema().getColumn(column - 1).getQualifier();
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index 0775c3c..a9f3706 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -21,6 +21,7 @@
*/
package org.apache.tajo.ipc.protocolrecords;
+import org.apache.tajo.DataChannel;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -44,4 +45,5 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
public boolean shouldDie();
public void setShouldDie();
public QueryContext getQueryContext();
+ public DataChannel getDataChannel();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index a92ef75..d509156 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -17,10 +17,13 @@ package org.apache.tajo.master;
import com.google.common.base.Preconditions;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlannerUtil;
import org.apache.tajo.engine.planner.logical.*;
import java.util.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+
/**
* A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
* An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
@@ -29,17 +32,6 @@ import java.util.*;
* In addition, it includes a logical plan to be executed in each node.
*/
public class ExecutionBlock {
-
- public static enum PartitionType {
- /** for hash partitioning */
- HASH,
- LIST,
- /** for map-side join */
- BROADCAST,
- /** for range partitioning */
- RANGE
- }
-
private ExecutionBlockId executionBlockId;
private LogicalNode plan = null;
private StoreTableNode store = null;
@@ -47,9 +39,12 @@ public class ExecutionBlock {
private ExecutionBlock parent;
private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
private PartitionType outputType;
+
private boolean hasJoinPlan;
private boolean hasUnionPlan;
+ private Set<String> broadcasted = new HashSet<String>();
+
public ExecutionBlock(ExecutionBlockId executionBlockId) {
this.executionBlockId = executionBlockId;
}
@@ -58,10 +53,6 @@ public class ExecutionBlock {
return executionBlockId;
}
- public String getOutputName() {
- return store.getTableName();
- }
-
public void setPartitionType(PartitionType partitionType) {
this.outputType = partitionType;
}
@@ -72,10 +63,9 @@ public class ExecutionBlock {
public void setPlan(LogicalNode plan) {
hasJoinPlan = false;
- Preconditions.checkArgument(plan.getType() == NodeType.STORE);
-
+ hasUnionPlan = false;
+ this.scanlist.clear();
this.plan = plan;
- store = (StoreTableNode) plan;
LogicalNode node = plan;
ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
@@ -96,6 +86,9 @@ public class ExecutionBlock {
s.add(s.size(), binary.getRightChild());
} else if (node instanceof ScanNode) {
scanlist.add((ScanNode)node);
+ } else if (node instanceof TableSubQueryNode) {
+ TableSubQueryNode subQuery = (TableSubQueryNode) node;
+ s.add(s.size(), subQuery.getSubQuery());
}
}
}
@@ -105,6 +98,10 @@ public class ExecutionBlock {
return plan;
}
+ public boolean isRoot() {
+ return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
+ }
+
public boolean hasParentBlock() {
return parent != null;
}
@@ -173,4 +170,24 @@ public class ExecutionBlock {
public boolean hasUnion() {
return hasUnionPlan;
}
+
+ public void addBroadcastTables(Collection<String> tableNames) {
+ broadcasted.addAll(tableNames);
+ }
+
+ public void addBroadcastTable(String tableName) {
+ broadcasted.add(tableName);
+ }
+
+ public boolean isBroadcastTable(String tableName) {
+ return broadcasted.contains(tableName);
+ }
+
+ public Collection<String> getBroadcastTables() {
+ return broadcasted;
+ }
+
+ public String toString() {
+ return executionBlockId.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
index fd3ae1e..51c825c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
@@ -25,10 +25,12 @@ import java.util.Iterator;
* For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
*/
public class ExecutionBlockCursor {
+ private MasterPlan masterPlan;
private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
private int cursor = 0;
public ExecutionBlockCursor(MasterPlan plan) {
+ this.masterPlan = plan;
buildOrder(plan.getRoot());
}
@@ -37,26 +39,14 @@ public class ExecutionBlockCursor {
}
private void buildOrder(ExecutionBlock current) {
- if (current.hasChildBlock()) {
- if (current.getChildNum() == 1) {
- ExecutionBlock block = current.getChildBlocks().iterator().next();
+ if (!masterPlan.isLeaf(current.getId())) {
+ if (masterPlan.getChildCount(current.getId()) == 1) {
+ ExecutionBlock block = masterPlan.getChild(current, 0);
buildOrder(block);
} else {
- Iterator<ExecutionBlock> it = current.getChildBlocks().iterator();
- ExecutionBlock outer = it.next();
- ExecutionBlock inner = it.next();
-
- // Switch between outer and inner
- // if an inner has a child and an outer doesn't.
- // It is for left-deep-first search.
- if (!outer.hasChildBlock() && inner.hasChildBlock()) {
- ExecutionBlock tmp = outer;
- outer = inner;
- inner = tmp;
+ for (ExecutionBlock exec : masterPlan.getChilds(current)) {
+ buildOrder(exec);
}
-
- buildOrder(outer);
- buildOrder(inner);
}
}
orderedBlocks.add(current);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2ddd891..5080599 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -55,7 +55,6 @@ import org.apache.tajo.storage.StorageUtil;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import static org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
@@ -107,7 +106,8 @@ public class GlobalEngine extends AbstractService {
NoSuchQueryIdException, IllegalQueryStatusException,
UnknownWorkerException, EmptyClusterException {
- LOG.info(">>>>>SQL: " + sql);
+ LOG.info("SQL: " + sql);
+ QueryContext queryContext = new QueryContext();
try {
// setting environment variables
@@ -127,6 +127,10 @@ public class GlobalEngine extends AbstractService {
final boolean hiveQueryMode = context.getConf().getBoolVar(TajoConf.ConfVars.HIVE_QUERY_MODE);
LOG.info("hive.query.mode:" + hiveQueryMode);
+ if (hiveQueryMode) {
+ queryContext.setHiveQueryMode();
+ }
+
Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
LogicalPlan plan = createLogicalPlan(planningContext);
@@ -139,7 +143,6 @@ public class GlobalEngine extends AbstractService {
responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
- QueryContext queryContext = new QueryContext();
hookManager.doHooks(queryContext, plan);
QueryJobManager queryJobManager = this.context.getQueryJobManager();
@@ -300,7 +303,7 @@ public class GlobalEngine extends AbstractService {
void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
}
- public class DistributedQueryHookManager {
+ public static class DistributedQueryHookManager {
private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
public void addHook(DistributedQueryHook hook) {
hooks.add(hook);
@@ -319,7 +322,7 @@ public class GlobalEngine extends AbstractService {
}
}
- private class CreateTableHook implements DistributedQueryHook {
+ public class CreateTableHook implements DistributedQueryHook {
@Override
public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
@@ -341,7 +344,7 @@ public class GlobalEngine extends AbstractService {
}
}
- private class InsertHook implements DistributedQueryHook {
+ public static class InsertHook implements DistributedQueryHook {
@Override
public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
@@ -416,10 +419,8 @@ public class GlobalEngine extends AbstractService {
ProjectionNode projectionNode = new ProjectionNode(targets);
projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
- Collection<QueryBlockGraph.BlockEdge> edges = plan.getConnectedBlocks(LogicalPlan.ROOT_BLOCK);
- LogicalPlan.QueryBlock block = plan.getBlock(edges.iterator().next().getTargetBlock());
- projectionNode.setChild(block.getRoot());
-
+ List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());
+ projectionNode.setChild(blocks.get(0).getRoot());
storeNode.setOutSchema(projectionNode.getOutSchema());
storeNode.setInSchema(projectionNode.getOutSchema());
@@ -427,12 +428,10 @@ public class GlobalEngine extends AbstractService {
} else {
storeNode.setOutSchema(subQueryOutSchema);
storeNode.setInSchema(subQueryOutSchema);
- Collection<QueryBlockGraph.BlockEdge> edges = plan.getConnectedBlocks(LogicalPlan.ROOT_BLOCK);
- LogicalPlan.QueryBlock block = plan.getBlock(edges.iterator().next().getTargetBlock());
- storeNode.setChild(block.getRoot());
+ List<LogicalPlan.QueryBlock> childBlocks = plan.getChildBlocks(plan.getRootBlock());
+ storeNode.setChild(childBlocks.get(0).getRoot());
}
- storeNode.setListPartition();
if (insertNode.hasStorageType()) {
storeNode.setStorageType(insertNode.getStorageType());
}