You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:55 UTC
[33/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
new file mode 100644
index 0000000..b8a9680
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+
+public class SelectionNode extends UnaryNode implements Cloneable {
+ @Expose private EvalNode qual;
+
+ public SelectionNode(int pid) {
+ super(pid, NodeType.SELECTION);
+ }
+
+ public EvalNode getQual() {
+ return this.qual;
+ }
+
+ public void setQual(EvalNode qual) {
+ this.qual = qual;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+ planStr.addExplan("Search Cond: " + getQual());
+ return planStr;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SelectionNode) {
+ SelectionNode other = (SelectionNode) obj;
+ return super.equals(other)
+ && this.qual.equals(other.qual);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ SelectionNode selNode = (SelectionNode) super.clone();
+ selNode.qual = (EvalNode) this.qual.clone();
+
+ return selNode;
+ }
+
+ public String toString() {
+ return "Selection (filter=" + qual + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
new file mode 100644
index 0000000..c7ea454
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.NONE_SHUFFLE;
+
+/**
+ * ShuffeFileWriteNode is an expression for an intermediate data materialization step.
+ */
+public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneable {
+ @Expose private TajoWorkerProtocol.ShuffleType shuffleType = NONE_SHUFFLE;
+ @Expose private int numOutputs;
+ @Expose private Column [] shuffleKeys;
+
+ public ShuffleFileWriteNode(int pid) {
+ super(pid, NodeType.STORE);
+ }
+
+ public final int getNumOutputs() {
+ return this.numOutputs;
+ }
+
+ public final boolean hasShuffleKeys() {
+ return this.shuffleKeys != null;
+ }
+
+ public final Column [] getShuffleKeys() {
+ return shuffleKeys;
+ }
+
+ public final void setShuffle(TajoWorkerProtocol.ShuffleType type, Column[] keys, int numPartitions) {
+ Preconditions.checkArgument(keys.length >= 0,
+ "At least one partition key must be specified.");
+ // In outer join, zero can be passed into this value because of empty tables.
+ // So, we should allow zero.
+ Preconditions.checkArgument(numPartitions >= 0,
+ "The number of partitions must be positive: %s", numPartitions);
+
+ this.shuffleType = type;
+ this.shuffleKeys = keys;
+ this.numOutputs = numPartitions;
+ }
+
+ public TajoWorkerProtocol.ShuffleType getShuffleType() {
+ return this.shuffleType;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ShuffleFileWriteNode) {
+ ShuffleFileWriteNode other = (ShuffleFileWriteNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && this.numOutputs == other.numOutputs;
+ eq = eq && TUtil.checkEquals(shuffleKeys, other.shuffleKeys);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ ShuffleFileWriteNode store = (ShuffleFileWriteNode) super.clone();
+ store.numOutputs = numOutputs;
+ store.shuffleKeys = shuffleKeys != null ? shuffleKeys.clone() : null;
+ return store;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Shuffle Write (type=" + shuffleType.name().toLowerCase());
+ if (storageType != null) {
+ sb.append(", storage="+ storageType.name());
+ }
+ sb.append(", part number=").append(numOutputs);
+ if (shuffleKeys != null) {
+ sb.append(", keys: ").append(TUtil.arrayToString(shuffleKeys));
+ }
+ sb.append(")");
+
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
new file mode 100644
index 0000000..a732710
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+public final class SortNode extends UnaryNode implements Cloneable {
+ @Expose private SortSpec [] sortKeys;
+
+ public SortNode(int pid) {
+ super(pid, NodeType.SORT);
+ }
+
+ public void setSortSpecs(SortSpec[] sortSpecs) {
+ Preconditions.checkArgument(sortSpecs.length > 0, "At least one sort key must be specified");
+ this.sortKeys = sortSpecs;
+ }
+
+ public SortSpec[] getSortKeys() {
+ return this.sortKeys;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SortNode) {
+ SortNode other = (SortNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && TUtil.checkEquals(sortKeys, other.sortKeys);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ SortNode sort = (SortNode) super.clone();
+ sort.sortKeys = sortKeys.clone();
+
+ return sort;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+ StringBuilder sb = new StringBuilder("Sort Keys: ");
+ for (int i = 0; i < sortKeys.length; i++) {
+ sb.append(sortKeys[i].getSortKey().getSimpleName()).append(" ")
+ .append(sortKeys[i].isAscending() ? "asc" : "desc");
+ if( i < sortKeys.length - 1) {
+ sb.append(",");
+ }
+ }
+ planStr.addExplan(sb.toString());
+ return planStr;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Sort [key= ");
+ for (int i = 0; i < sortKeys.length; i++) {
+ sb.append(sortKeys[i].getSortKey().getQualifiedName()).append(" ")
+ .append(sortKeys[i].isAscending() ? "asc" : "desc");
+ if(i < sortKeys.length - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("]");
+
+ sb.append("\n\"out schema: " + getOutSchema()
+ + "\n\"in schema: " + getInSchema());
+ return sb.toString()+"\n"
+ + getChild().toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
new file mode 100644
index 0000000..b0f7b7a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+public class StoreTableNode extends PersistentStoreNode implements Cloneable {
+ @Expose protected String tableName;
+ @Expose private PartitionMethodDesc partitionDesc;
+
+ public StoreTableNode(int pid) {
+ super(pid, NodeType.STORE);
+ }
+
+ protected StoreTableNode(int pid, NodeType nodeType) {
+ super(pid, nodeType);
+ }
+
+ public boolean hasTargetTable() {
+ return tableName != null;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public final String getTableName() {
+ return this.tableName;
+ }
+
+ public boolean hasPartition() {
+ return this.partitionDesc != null;
+ }
+
+ public PartitionMethodDesc getPartitionMethod() {
+ return partitionDesc;
+ }
+
+ public void setPartitionMethod(PartitionMethodDesc partitionDesc) {
+ this.partitionDesc = partitionDesc;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+ planStr.appendTitle(" into ").appendTitle(tableName);
+ planStr.addExplan("Store type: " + storageType);
+
+ return planStr;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof StoreTableNode) {
+ StoreTableNode other = (StoreTableNode) obj;
+ boolean eq = super.equals(other);
+ eq = eq && this.tableName.equals(other.tableName);
+ eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+ return eq;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ StoreTableNode store = (StoreTableNode) super.clone();
+ store.tableName = tableName;
+ store.partitionDesc = partitionDesc != null ? (PartitionMethodDesc) partitionDesc.clone() : null;
+ return store;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Store Table (table=").append(tableName);
+ if (storageType != null) {
+ sb.append(", storage="+ storageType.name());
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
new file mode 100644
index 0000000..4d0090b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+public class TableSubQueryNode extends RelationNode implements Projectable {
+ @Expose private String tableName;
+ @Expose private LogicalNode subQuery;
+ @Expose private Target [] targets; // unused
+
+ public TableSubQueryNode(int pid) {
+ super(pid, NodeType.TABLE_SUBQUERY);
+ }
+
+ public void init(String tableName, LogicalNode subQuery) {
+ this.tableName = tableName;
+ if (subQuery != null) {
+ this.subQuery = subQuery;
+ setOutSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+ setInSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+ getInSchema().setQualifier(this.tableName);
+ getOutSchema().setQualifier(this.tableName);
+ }
+ }
+
+ @Override
+ public boolean hasAlias() {
+ return false;
+ }
+
+ @Override
+ public String getAlias() {
+ return null;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getCanonicalName() {
+ return tableName;
+ }
+
+ @Override
+ public Schema getTableSchema() {
+ // an output schema can be determined by targets. So, an input schema of
+ // TableSubQueryNode is only eligible for table schema.
+ //
+ // TODO - but, a derived table can have column alias. For that, we should improve here.
+ //
+ // example) select * from (select col1, col2, col3 from t1) view (c1, c2);
+
+ return getInSchema();
+ }
+
+ public void setSubQuery(LogicalNode node) {
+ this.subQuery = node;
+ setInSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+ getInSchema().setQualifier(this.tableName);
+ if (hasTargets()) {
+ setOutSchema(PlannerUtil.targetToSchema(targets));
+ } else {
+ setOutSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+ }
+ getOutSchema().setQualifier(this.tableName);
+ }
+
+ public LogicalNode getSubQuery() {
+ return subQuery;
+ }
+
+ @Override
+ public boolean hasTargets() {
+ return targets != null;
+ }
+
+ @Override
+ public void setTargets(Target[] targets) {
+ this.targets = targets;
+ setOutSchema(PlannerUtil.targetToSchema(targets));
+ }
+
+ @Override
+ public Target[] getTargets() {
+ return targets;
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+ planStr.appendTitle(" as ").appendTitle(tableName);
+
+ if (hasTargets()) {
+ StringBuilder sb = new StringBuilder("Targets: ");
+ for (int i = 0; i < targets.length; i++) {
+ sb.append(targets[i]);
+ if( i < targets.length - 1) {
+ sb.append(", ");
+ }
+ }
+ planStr.addExplan(sb.toString());
+ if (getOutSchema() != null) {
+ planStr.addExplan("out schema: " + getOutSchema().toString());
+ }
+ if (getInSchema() != null) {
+ planStr.addExplan("in schema: " + getInSchema().toString());
+ }
+ }
+
+ return planStr;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableName, subQuery);
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (object instanceof TableSubQueryNode) {
+ TableSubQueryNode another = (TableSubQueryNode) object;
+ return tableName.equals(another.tableName) && subQuery.equals(another.subQuery);
+ }
+
+ return false;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ TableSubQueryNode newTableSubQueryNode = (TableSubQueryNode) super.clone();
+ newTableSubQueryNode.tableName = tableName;
+ newTableSubQueryNode.subQuery = (LogicalNode) subQuery.clone();
+ if (hasTargets()) {
+ newTableSubQueryNode.targets = new Target[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ newTableSubQueryNode.targets[i] = (Target) targets[i].clone();
+ }
+ }
+ return newTableSubQueryNode;
+ }
+
+ @Override
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ subQuery.preOrder(visitor);
+ }
+
+ @Override
+ public void postOrder(LogicalNodeVisitor visitor) {
+ subQuery.preOrder(visitor);
+ visitor.visit(this);
+ }
+
+ public String toString() {
+ return "Inline view (name=" + tableName + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
new file mode 100644
index 0000000..0b06e9e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.util.TUtil;
+
+
+public abstract class UnaryNode extends LogicalNode implements Cloneable {
+ @Expose LogicalNode child;
+
+ /**
+ * @param type
+ */
+ public UnaryNode(int pid, NodeType type) {
+ super(pid, type);
+ }
+
+ public void setChild(LogicalNode subNode) {
+ this.child = subNode;
+ }
+
+ public <T extends LogicalNode> T getChild() {
+ return (T) this.child;
+ }
+
+ @Override
+ public boolean deepEquals(Object o) {
+ if (o instanceof UnaryNode) {
+ UnaryNode u = (UnaryNode) o;
+ return equals(o) && TUtil.checkEquals(child, u.child);
+ }
+ return false;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ UnaryNode unary = (UnaryNode) super.clone();
+ unary.child = (LogicalNode) (child == null ? null : child.clone());
+
+ return unary;
+ }
+
+ public void preOrder(LogicalNodeVisitor visitor) {
+ visitor.visit(this);
+ child.preOrder(visitor);
+ }
+
+ public void postOrder(LogicalNodeVisitor visitor) {
+ child.postOrder(visitor);
+ visitor.visit(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
new file mode 100644
index 0000000..49183d0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.planner.PlanString;
+
+public class UnionNode extends BinaryNode {
+
+ public UnionNode(int pid) {
+ super(pid, NodeType.UNION);
+ }
+
+ @Override
+ public PlanString getPlanString() {
+ PlanString planStr = new PlanString(this);
+ return planStr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
new file mode 100644
index 0000000..c8bca03
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.tajo.engine.eval.EvalNode;
+
+public class Edge {
+ private String src;
+ private String target;
+ private EvalNode joinQual;
+
+ public Edge(String src, String target, EvalNode joinQual) {
+ this.src = src;
+ this.target = target;
+ this.joinQual = joinQual;
+ }
+
+ public String getSrc() {
+ return this.src;
+ }
+
+ public String getTarget() {
+ return this.target;
+ }
+
+ public EvalNode getJoinQual() {
+ return this.joinQual;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + src + "=> " + target + ", " + joinQual + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
new file mode 100644
index 0000000..5ae34f7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+
+/**
+ * It contains the result of join enumeration.
+ */
+@InterfaceStability.Evolving
+public class FoundJoinOrder {
+ private JoinNode joinNode;
+ private double cost;
+
+ public FoundJoinOrder(JoinNode joinNode, double cost) {
+ this.joinNode = joinNode;
+ this.cost = cost;
+ }
+
+ /**
+ * @return a ordered join operators
+ */
+ public JoinNode getOrderedJoin() {
+ return this.joinNode;
+ }
+
+ public double getCost() {
+ return cost;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
new file mode 100644
index 0000000..f2bcd77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This is a greedy heuristic algorithm to find a bushy join tree. This algorithm finds
+ * the best join order with join conditions and pushed-down join conditions to
+ * all join operators.
+ */
+public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
+ public static double DEFAULT_SELECTION_FACTOR = 0.1;
+
+ @Override
+ public FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraph joinGraph,
+ Set<String> relationsWithoutQual) throws PlanningException {
+
+ // Setup a remain relation set to be joined
+ // Why we should use LinkedHashSet? - it should keep the deterministic for the order of joins.
+ // Otherwise, join orders can be different even if join costs are the same to each other.
+ Set<LogicalNode> remainRelations = new LinkedHashSet<LogicalNode>();
+ for (RelationNode relation : block.getRelations()) {
+ remainRelations.add(relation);
+ }
+
+ LogicalNode latestJoin;
+ JoinEdge bestPair;
+
+ while (remainRelations.size() > 1) {
+ // Find the best join pair among all joinable operators in candidate set.
+ bestPair = getBestPair(plan, joinGraph, remainRelations);
+
+ remainRelations.remove(bestPair.getLeftRelation()); // remainRels = remainRels \ Ti
+ remainRelations.remove(bestPair.getRightRelation()); // remainRels = remainRels \ Tj
+
+ latestJoin = createJoinNode(plan, bestPair);
+ remainRelations.add(latestJoin);
+
+ // all logical nodes should be registered to corresponding blocks
+ block.registerNode(latestJoin);
+ }
+
+ JoinNode joinTree = (JoinNode) remainRelations.iterator().next();
+ // all generated nodes should be registered to corresponding blocks
+ block.registerNode(joinTree);
+ return new FoundJoinOrder(joinTree, getCost(joinTree));
+ }
+
+ private static JoinNode createJoinNode(LogicalPlan plan, JoinEdge joinEdge) {
+ LogicalNode left = joinEdge.getLeftRelation();
+ LogicalNode right = joinEdge.getRightRelation();
+
+ JoinNode joinNode = plan.createNode(JoinNode.class);
+
+ if (PlannerUtil.isCommutativeJoin(joinEdge.getJoinType())) {
+ // if only one operator is relation
+ if ((left instanceof RelationNode) && !(right instanceof RelationNode)) {
+ // for left deep
+ joinNode.init(joinEdge.getJoinType(), right, left);
+ } else {
+ // if both operators are relation or if both are relations
+ // we don't need to concern the left-right position.
+ joinNode.init(joinEdge.getJoinType(), left, right);
+ }
+ } else {
+ joinNode.init(joinEdge.getJoinType(), left, right);
+ }
+
+ Schema mergedSchema = SchemaUtil.merge(joinNode.getLeftChild().getOutSchema(),
+ joinNode.getRightChild().getOutSchema());
+ joinNode.setInSchema(mergedSchema);
+ joinNode.setOutSchema(mergedSchema);
+ if (joinEdge.hasJoinQual()) {
+ joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
+ }
+ return joinNode;
+ }
+
+ /**
+ * Find the best join pair among all joinable operators in candidate set.
+ *
+ * @param plan a logical plan
+ * @param graph a join graph which consists of vertices and edges, where vertex is relation and
+ * each edge is join condition.
+ * @param candidateSet candidate operators to be joined.
+ * @return The best join pair among them
+ * @throws PlanningException
+ */
+ private JoinEdge getBestPair(LogicalPlan plan, JoinGraph graph, Set<LogicalNode> candidateSet)
+ throws PlanningException {
+ double minCost = Double.MAX_VALUE;
+ JoinEdge bestJoin = null;
+
+ double minNonCrossJoinCost = Double.MAX_VALUE;
+ JoinEdge bestNonCrossJoin = null;
+
+ for (LogicalNode outer : candidateSet) {
+ for (LogicalNode inner : candidateSet) {
+ if (outer.equals(inner)) {
+ continue;
+ }
+
+ JoinEdge foundJoin = findJoin(plan, graph, outer, inner);
+ if (foundJoin == null) {
+ continue;
+ }
+ double cost = getCost(foundJoin);
+
+ if (cost < minCost) {
+ minCost = cost;
+ bestJoin = foundJoin;
+ }
+
+ // Keep the min cost join
+ // But, if there exists a qualified join, the qualified join must be chosen
+ // rather than cross join regardless of cost.
+ if (foundJoin.hasJoinQual()) {
+ if (cost < minNonCrossJoinCost) {
+ minNonCrossJoinCost = cost;
+ bestNonCrossJoin = foundJoin;
+ }
+ }
+ }
+ }
+
+ if (bestNonCrossJoin != null) {
+ return bestNonCrossJoin;
+ } else {
+ return bestJoin;
+ }
+ }
+
+ /**
+ * Find a join between two logical operator trees
+ *
+ * @return If there is no join condition between two relation, it returns NULL value.
+ */
+ private static JoinEdge findJoin(LogicalPlan plan, JoinGraph graph, LogicalNode outer, LogicalNode inner)
+ throws PlanningException {
+ JoinEdge foundJoinEdge = null;
+
+ for (String outerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer)) {
+ for (String innerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner)) {
+
+ // Find all joins between two relations and merge them into one join if possible
+ if (graph.hasEdge(outerName, innerName)) {
+ JoinEdge existJoinEdge = graph.getEdge(outerName, innerName);
+ if (foundJoinEdge == null) {
+ foundJoinEdge = new JoinEdge(existJoinEdge.getJoinType(), outer, inner,
+ existJoinEdge.getJoinQual());
+ } else {
+ foundJoinEdge.addJoinQual(AlgebraicUtil.createSingletonExprFromCNF(
+ existJoinEdge.getJoinQual()));
+ }
+ }
+ }
+ }
+
+ if (foundJoinEdge == null) {
+ foundJoinEdge = new JoinEdge(JoinType.CROSS, outer, inner);
+ }
+
+ return foundJoinEdge;
+ }
+
+ /**
+ * Getting a cost of one join
+ * @param joinEdge
+ * @return
+ */
+ public static double getCost(JoinEdge joinEdge) {
+ double filterFactor = 1;
+ if (joinEdge.hasJoinQual()) {
+ // TODO - should consider join type
+ // TODO - should statistic information obtained from query history
+ filterFactor = filterFactor * Math.pow(DEFAULT_SELECTION_FACTOR, joinEdge.getJoinQual().length);
+ return getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()) * filterFactor;
+ } else {
+ // make cost bigger if cross join
+ return Math.pow(getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()), 2);
+ }
+ }
+
+ // TODO - costs of other operator operators (e.g., group-by and sort) should be computed in proper manners.
+ public static double getCost(LogicalNode node) {
+ switch (node.getType()) {
+
+ case PROJECTION:
+ ProjectionNode projectionNode = (ProjectionNode) node;
+ return getCost(projectionNode.getChild());
+
+ case JOIN:
+ JoinNode joinNode = (JoinNode) node;
+ double filterFactor = 1;
+ if (joinNode.hasJoinQual()) {
+ filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
+ AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
+ return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
+ } else {
+ return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
+ }
+
+ case SELECTION:
+ SelectionNode selectionNode = (SelectionNode) node;
+ return getCost(selectionNode.getChild()) *
+ Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
+
+ case TABLE_SUBQUERY:
+ TableSubQueryNode subQueryNode = (TableSubQueryNode) node;
+ return getCost(subQueryNode.getSubQuery());
+
+ case SCAN:
+ ScanNode scanNode = (ScanNode) node;
+ if (scanNode.getTableDesc().getStats() != null) {
+ double cost = ((ScanNode)node).getTableDesc().getStats().getNumBytes();
+ return cost;
+ } else {
+ return Long.MAX_VALUE;
+ }
+
+ case UNION:
+ UnionNode unionNode = (UnionNode) node;
+ return getCost(unionNode.getLeftChild()) + getCost(unionNode.getRightChild());
+
+ case EXCEPT:
+ case INTERSECT:
+ throw new UnsupportedOperationException("getCost() does not support EXCEPT or INTERSECT yet");
+
+ default:
+ // all binary operators (join, union, except, and intersect) are handled in the above cases.
+ // So, we need to handle only unary nodes in default.
+ return getCost(((UnaryNode) node).getChild());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
new file mode 100644
index 0000000..e5c29f0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.RelationNode;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class JoinEdge {
+ private final JoinType joinType;
+ private final LogicalNode leftRelation;
+ private final LogicalNode rightRelation;
+ private final Set<EvalNode> joinQual = Sets.newHashSet();
+
+ public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation) {
+ this.joinType = joinType;
+ this.leftRelation = leftRelation;
+ this.rightRelation = rightRelation;
+ }
+
+ public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation,
+ EvalNode ... condition) {
+ this(joinType, leftRelation, rightRelation);
+ Collections.addAll(joinQual, condition);
+ }
+
+ public JoinType getJoinType() {
+ return joinType;
+ }
+
+ public LogicalNode getLeftRelation() {
+ return leftRelation;
+ }
+
+ public LogicalNode getRightRelation() {
+ return rightRelation;
+ }
+
+ public boolean hasJoinQual() {
+ return joinQual.size() > 0;
+ }
+
+ public void addJoinQual(EvalNode joinQual) {
+ this.joinQual.add(joinQual);
+ }
+
+ public EvalNode [] getJoinQual() {
+ return joinQual.toArray(new EvalNode[joinQual.size()]);
+ }
+
+ public String toString() {
+ return leftRelation + " " + joinType + " " + rightRelation + " ON " + TUtil.collectionToString(joinQual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
new file mode 100644
index 0000000..77e03ea
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.NamedExprsManager;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
+
+ private String [] guessRelationsFromJoinQual(LogicalPlan.QueryBlock block, EvalNode joinCondition)
+ throws PlanningException {
+
+ // Note that we can guarantee that each join qual used here is a singleton.
+ // This is because we use dissect a join qual into conjunctive normal forms.
+ // In other words, each join qual has a form 'col1 = col2'.
+ Column leftExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getLeftExpr()).get(0);
+ Column rightExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getRightExpr()).get(0);
+
+ // 0 - left table, 1 - right table
+ String [] relationNames = new String[2];
+
+ NamedExprsManager namedExprsMgr = block.getNamedExprsManager();
+ if (leftExpr.hasQualifier()) {
+ relationNames[0] = leftExpr.getQualifier();
+ } else {
+ if (namedExprsMgr.isAliasedName(leftExpr.getSimpleName())) {
+ String columnName = namedExprsMgr.getOriginalName(leftExpr.getSimpleName());
+ String qualifier = CatalogUtil.extractQualifier(columnName);
+ relationNames[0] = qualifier;
+ } else {
+ throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+ }
+ }
+
+ if (rightExpr.hasQualifier()) {
+ relationNames[1] = rightExpr.getQualifier();
+ } else {
+ if (namedExprsMgr.isAliasedName(rightExpr.getSimpleName())) {
+ String columnName = namedExprsMgr.getOriginalName(rightExpr.getSimpleName());
+ String qualifier = CatalogUtil.extractQualifier(columnName);
+ relationNames[1] = qualifier;
+ } else {
+ throw new PlanningException("Cannot expect a referenced relation: " + rightExpr);
+ }
+ }
+
+ return relationNames;
+ }
+ public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
+ JoinNode joinNode) throws PlanningException {
+ Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
+ Set<EvalNode> nonJoinQuals = Sets.newHashSet();
+ for (EvalNode singleQual : cnf) {
+ if (EvalTreeUtil.isJoinQual(singleQual, true)) {
+
+ String [] relations = guessRelationsFromJoinQual(block, singleQual);
+ String leftExprRelName = relations[0];
+ String rightExprRelName = relations[1];
+
+ Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
+
+ boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
+ JoinEdge edge;
+ edge = getEdge(leftExprRelName, rightExprRelName);
+
+ if (edge != null) {
+ edge.addJoinQual(singleQual);
+ } else {
+ if (isLeftExprForLeftTable) {
+ edge = new JoinEdge(joinNode.getJoinType(),
+ block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
+ addEdge(leftExprRelName, rightExprRelName, edge);
+ } else {
+ edge = new JoinEdge(joinNode.getJoinType(),
+ block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
+ addEdge(rightExprRelName, leftExprRelName, edge);
+ }
+ }
+ } else {
+ nonJoinQuals.add(singleQual);
+ }
+ }
+ cnf.retainAll(nonJoinQuals);
+ return cnf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
new file mode 100644
index 0000000..eafa671
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlanningException;
+
+import java.util.Set;
+
+/**
+ * An interface for join order algorithms
+ */
+@InterfaceStability.Evolving
+public interface JoinOrderAlgorithm {
+
+ /**
+ *
+ * @param plan
+ * @param block
+ * @param joinGraph A join graph represents join conditions and their connections among relations.
+ * Given a graph, each vertex represents a relation, and each edge contains a join condition.
+ * A join graph does not contain relations that do not have any corresponding join condition.
+ * @param relationsWithoutQual The names of relations that do not have any corresponding join condition.
+ * @return
+ * @throws PlanningException
+ */
+ FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraph joinGraph,
+ Set<String> relationsWithoutQual) throws PlanningException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
new file mode 100644
index 0000000..208973e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public abstract class AggregationExec extends UnaryPhysicalExec {
+ protected GroupbyNode plan;
+
+ protected final int groupingKeyNum;
+ protected int groupingKeyIds[];
+ protected final int aggFunctionsNum;
+ protected final AggregationFunctionCallEval aggFunctions[];
+
+ protected Schema evalSchema;
+
+ public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
+ PhysicalExec child) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ this.plan = plan;
+
+ evalSchema = plan.getOutSchema();
+
+ final Column [] keyColumns = plan.getGroupingColumns();
+ groupingKeyNum = keyColumns.length;
+ groupingKeyIds = new int[groupingKeyNum];
+ Column col;
+ for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+ col = keyColumns[idx];
+ groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
+ }
+
+ if (plan.hasAggFunctions()) {
+ aggFunctions = plan.getAggFunctions();
+ aggFunctionsNum = aggFunctions.length;
+ } else {
+ aggFunctions = new AggregationFunctionCallEval[0];
+ aggFunctionsNum = 0;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ plan = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
new file mode 100644
index 0000000..60a7c19
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BNLJoinExec extends BinaryPhysicalExec {
+ // from logical plan
+ private JoinNode plan;
+ private final boolean hasJoinQual;
+ private EvalNode joinQual;
+
+ private List<Tuple> leftTupleSlots;
+ private List<Tuple> rightTupleSlots;
+ private Iterator<Tuple> leftIterator;
+ private Iterator<Tuple> rightIterator;
+
+ private boolean leftEnd;
+ private boolean rightEnd;
+
+ // temporal tuples and states for nested loop join
+ private FrameTuple frameTuple;
+ private Tuple leftTuple = null;
+ private Tuple outputTuple = null;
+ private Tuple rightNext = null;
+
+ private final int TUPLE_SLOT_SIZE = 10000;
+
+ // projection
+ private Projector projector;
+
+ public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
+ final PhysicalExec leftExec, PhysicalExec rightExec) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), leftExec, rightExec);
+ this.plan = plan;
+ this.joinQual = plan.getJoinQual();
+ if (joinQual != null) { // if join type is not 'cross join'
+ hasJoinQual = true;
+ } else {
+ hasJoinQual = false;
+ }
+ this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+ this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+ this.leftIterator = leftTupleSlots.iterator();
+ this.rightIterator = rightTupleSlots.iterator();
+ this.rightEnd = false;
+ this.leftEnd = false;
+
+ // for projection
+ if (!plan.hasTargets()) {
+ plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
+ }
+
+ projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+ // for join
+ frameTuple = new FrameTuple();
+ outputTuple = new VTuple(outSchema.size());
+ }
+
+ public JoinNode getPlan() {
+ return plan;
+ }
+
+ public Tuple next() throws IOException {
+
+ if (leftTupleSlots.isEmpty()) {
+ for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+ Tuple t = leftChild.next();
+ if (t == null) {
+ leftEnd = true;
+ break;
+ }
+ leftTupleSlots.add(t);
+ }
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
+ }
+
+ if (rightTupleSlots.isEmpty()) {
+ for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+ Tuple t = rightChild.next();
+ if (t == null) {
+ rightEnd = true;
+ break;
+ }
+ rightTupleSlots.add(t);
+ }
+ rightIterator = rightTupleSlots.iterator();
+ }
+
+ if((rightNext = rightChild.next()) == null){
+ rightEnd = true;
+ }
+
+ while (true) {
+ if (!rightIterator.hasNext()) { // if leftIterator ended
+ if (leftIterator.hasNext()) { // if rightTupleslot remains
+ leftTuple = leftIterator.next();
+ rightIterator = rightTupleSlots.iterator();
+ } else {
+ if (rightEnd) {
+ rightChild.rescan();
+ rightEnd = false;
+
+ if (leftEnd) {
+ return null;
+ }
+ leftTupleSlots.clear();
+ for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+ Tuple t = leftChild.next();
+ if (t == null) {
+ leftEnd = true;
+ break;
+ }
+ leftTupleSlots.add(t);
+ }
+ if (leftTupleSlots.isEmpty()) {
+ return null;
+ }
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
+
+ } else {
+ leftIterator = leftTupleSlots.iterator();
+ leftTuple = leftIterator.next();
+ }
+
+ rightTupleSlots.clear();
+ if (rightNext != null) {
+ rightTupleSlots.add(rightNext);
+ for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right
+ Tuple t = rightChild.next();
+ if (t == null) {
+ rightEnd = true;
+ break;
+ }
+ rightTupleSlots.add(t);
+ }
+ } else {
+ for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right
+ Tuple t = rightChild.next();
+ if (t == null) {
+ rightEnd = true;
+ break;
+ }
+ rightTupleSlots.add(t);
+ }
+ }
+
+ if ((rightNext = rightChild.next()) == null) {
+ rightEnd = true;
+ }
+ rightIterator = rightTupleSlots.iterator();
+ }
+ }
+
+ frameTuple.set(leftTuple, rightIterator.next());
+ if (hasJoinQual) {
+ if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+ projector.eval(frameTuple, outputTuple);
+ return outputTuple;
+ }
+ } else {
+ projector.eval(frameTuple, outputTuple);
+ return outputTuple;
+ }
+ }
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ super.rescan();
+ rightEnd = false;
+ rightTupleSlots.clear();
+ leftTupleSlots.clear();
+ rightIterator = rightTupleSlots.iterator();
+ leftIterator = leftTupleSlots.iterator();
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ rightTupleSlots.clear();
+ leftTupleSlots.clear();
+ rightTupleSlots = null;
+ leftTupleSlots = null;
+ rightIterator = null;
+ leftIterator = null;
+ plan = null;
+ joinQual = null;
+ projector = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
new file mode 100644
index 0000000..35de707
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class BSTIndexScanExec extends PhysicalExec {
+ private ScanNode scanNode;
+ private SeekableScanner fileScanner;
+
+ private EvalNode qual;
+ private BSTIndex.BSTIndexReader reader;
+
+ private Projector projector;
+
+ private Datum[] datum = null;
+
+ private boolean initialize = true;
+
+ private float progress;
+
+ public BSTIndexScanExec(TaskAttemptContext context,
+ AbstractStorageManager sm , ScanNode scanNode ,
+ FileFragment fragment, Path fileName , Schema keySchema,
+ TupleComparator comparator , Datum[] datum) throws IOException {
+ super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+ this.scanNode = scanNode;
+ this.qual = scanNode.getQual();
+ this.datum = datum;
+
+ this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
+ scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
+ this.fileScanner.init();
+ this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
+
+ this.reader = new BSTIndex(sm.getFileSystem().getConf()).
+ getIndexReader(fileName, keySchema, comparator);
+ this.reader.open();
+ }
+
+ @Override
+ public void init() throws IOException {
+ progress = 0.0f;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if(initialize) {
+ //TODO : more complicated condition
+ Tuple key = new VTuple(datum.length);
+ key.put(datum);
+ long offset = reader.find(key);
+ if (offset == -1) {
+ reader.close();
+ fileScanner.close();
+ return null;
+ }else {
+ fileScanner.seek(offset);
+ }
+ initialize = false;
+ } else {
+ if(!reader.isCurInMemory()) {
+ return null;
+ }
+ long offset = reader.next();
+ if(offset == -1 ) {
+ reader.close();
+ fileScanner.close();
+ return null;
+ } else {
+ fileScanner.seek(offset);
+ }
+ }
+ Tuple tuple;
+ Tuple outTuple = new VTuple(this.outSchema.size());
+ if (!scanNode.hasQual()) {
+ if ((tuple = fileScanner.next()) != null) {
+ projector.eval(tuple, outTuple);
+ return outTuple;
+ } else {
+ return null;
+ }
+ } else {
+ while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+ if (qual.eval(inSchema, tuple).isTrue()) {
+ projector.eval(tuple, outTuple);
+ return outTuple;
+ } else {
+ fileScanner.seek(reader.next());
+ }
+ }
+ }
+
+ return null;
+ }
+ @Override
+ public void rescan() throws IOException {
+ fileScanner.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.cleanup(null, reader, fileScanner);
+ reader = null;
+ fileScanner = null;
+ scanNode = null;
+ qual = null;
+ projector = null;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
new file mode 100644
index 0000000..f6f3e52
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+ public RESULT visit(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+ throws PhysicalPlanningException {
+
+ // Please keep all physical executors except for abstract class.
+ // They should be ordered in an lexicography order of their names for easy code maintenance.
+ if (exec instanceof BNLJoinExec) {
+ return visitBNLJoin(context, (BNLJoinExec) exec, stack);
+ } else if (exec instanceof BSTIndexScanExec) {
+ return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack);
+ } else if (exec instanceof EvalExprExec) {
+ return visitEvalExpr(context, (EvalExprExec) exec, stack);
+ } else if (exec instanceof ExternalSortExec) {
+ return visitExternalSort(context, (ExternalSortExec) exec, stack);
+ } else if (exec instanceof HashAggregateExec) {
+ return visitHashAggregate(context, (HashAggregateExec) exec, stack);
+ } else if (exec instanceof HashBasedColPartitionStoreExec) {
+ return visitHashBasedColPartitionStore(context, (HashBasedColPartitionStoreExec) exec, stack);
+ } else if (exec instanceof HashFullOuterJoinExec) {
+ return visitHashFullOuterJoin(context, (HashFullOuterJoinExec) exec, stack);
+ } else if (exec instanceof HashJoinExec) {
+ return visitHashJoin(context, (HashJoinExec) exec, stack);
+ } else if (exec instanceof HashLeftAntiJoinExec) {
+ return visitHashLeftAntiJoin(context, (HashLeftAntiJoinExec) exec, stack);
+ } else if (exec instanceof HashLeftOuterJoinExec) {
+ return visitHashLeftOuterJoin(context, (HashLeftOuterJoinExec) exec, stack);
+ } else if (exec instanceof HashLeftSemiJoinExec) {
+ return visitLeftHashSemiJoin(context, (HashLeftSemiJoinExec) exec, stack);
+ } else if (exec instanceof HashShuffleFileWriteExec) {
+ return visitHashShuffleFileWrite(context, (HashShuffleFileWriteExec) exec, stack);
+ } else if (exec instanceof HavingExec) {
+ return visitHaving(context, (HavingExec) exec, stack);
+ } else if (exec instanceof LimitExec) {
+ return visitLimit(context, (LimitExec) exec, stack);
+ } else if (exec instanceof MemSortExec) {
+ return visitMemSort(context, (MemSortExec) exec, stack);
+ } else if (exec instanceof MergeFullOuterJoinExec) {
+ return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack);
+ } else if (exec instanceof MergeJoinExec) {
+ return visitMergeJoin(context, (MergeJoinExec) exec, stack);
+ } else if (exec instanceof NLJoinExec) {
+ return visitNLJoin(context, (NLJoinExec) exec, stack);
+ } else if (exec instanceof NLLeftOuterJoinExec) {
+ return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
+ } else if (exec instanceof ProjectionExec) {
+ return visitProjection(context, (ProjectionExec) exec, stack);
+ } else if (exec instanceof RangeShuffleFileWriteExec) {
+ return visitRangeShuffleFileWrite(context, (RangeShuffleFileWriteExec) exec, stack);
+ } else if (exec instanceof RightOuterMergeJoinExec) {
+ return visitRightOuterMergeJoin(context, (RightOuterMergeJoinExec) exec, stack);
+ } else if (exec instanceof SelectionExec) {
+ return visitSelection(context, (SelectionExec) exec, stack);
+ } else if (exec instanceof SeqScanExec) {
+ return visitSeqScan(context, (SeqScanExec) exec, stack);
+ } else if (exec instanceof SortAggregateExec) {
+ return visitSortAggregate(context, (SortAggregateExec) exec, stack);
+ } else if (exec instanceof SortBasedColPartitionStoreExec) {
+ return visitSortBasedColPartitionStore(context, (SortBasedColPartitionStoreExec) exec, stack);
+ } else if (exec instanceof StoreTableExec) {
+ return visitStoreTable(context, (StoreTableExec) exec, stack);
+ }
+
+ throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+ }
+
+ private RESULT visitUnaryExecutor(CONTEXT context, UnaryPhysicalExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ stack.push(exec);
+ RESULT r = visit(exec.getChild(), stack, context);
+ stack.pop();
+ return r;
+ }
+
+ private RESULT visitBinaryExecutor(CONTEXT context, BinaryPhysicalExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ stack.push(exec);
+ RESULT r = visit(exec.getLeftChild(), stack, context);
+ visit(exec.getRightChild(), stack, context);
+ stack.pop();
+ return r;
+ }
+
+ @Override
+ public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return null;
+ }
+
+ @Override
+ public RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashBasedColPartitionStore(CONTEXT context, HashBasedColPartitionStoreExec exec,
+ Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack) throws
+ PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws
+ PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitBinaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack) {
+ return null;
+ }
+
+ @Override
+ public RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+ throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitSortBasedColPartitionStore(CONTEXT context, SortBasedColPartitionStoreExec exec,
+ Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+
+ @Override
+ public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+ return visitUnaryExecutor(context, exec, stack);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
new file mode 100644
index 0000000..628c18c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+
+import java.io.IOException;
+
+public abstract class BinaryPhysicalExec extends PhysicalExec {
+ protected PhysicalExec leftChild;
+ protected PhysicalExec rightChild;
+ protected float progress;
+ protected TableStats inputStats;
+
+ public BinaryPhysicalExec(final TaskAttemptContext context,
+ final Schema inSchema, final Schema outSchema,
+ final PhysicalExec outer, final PhysicalExec inner) {
+ super(context, inSchema, outSchema);
+ this.leftChild = outer;
+ this.rightChild = inner;
+ this.inputStats = new TableStats();
+ }
+
+ public PhysicalExec getLeftChild() {
+ return leftChild;
+ }
+
+ public PhysicalExec getRightChild() {
+ return rightChild;
+ }
+
+ @Override
+ public void init() throws IOException {
+ leftChild.init();
+ rightChild.init();
+ progress = 0.0f;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ leftChild.rescan();
+ rightChild.rescan();
+ }
+
+ @Override
+ public void close() throws IOException {
+ leftChild.close();
+ rightChild.close();
+
+ getInputStats();
+
+ leftChild = null;
+ rightChild = null;
+
+ progress = 1.0f;
+ }
+
+ @Override
+ public float getProgress() {
+ if (leftChild == null) {
+ return progress;
+ }
+ return leftChild.getProgress() * 0.5f + rightChild.getProgress() * 0.5f;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ if (leftChild == null) {
+ return inputStats;
+ }
+ TableStats leftInputStats = leftChild.getInputStats();
+ inputStats.setNumBytes(0);
+ inputStats.setReadBytes(0);
+ inputStats.setNumRows(0);
+
+ if (leftInputStats != null) {
+ inputStats.setNumBytes(leftInputStats.getNumBytes());
+ inputStats.setReadBytes(leftInputStats.getReadBytes());
+ inputStats.setNumRows(leftInputStats.getNumRows());
+ }
+
+ TableStats rightInputStats = rightChild.getInputStats();
+ if (rightInputStats != null) {
+ inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+ inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+ inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+ }
+
+ return inputStats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
new file mode 100644
index 0000000..fe36905
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.logical.InsertNode;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
+ protected final TableMeta meta;
+ protected final StoreTableNode plan;
+ protected Path storeTablePath;
+
+ protected final int keyNum;
+ protected final int [] keyIds;
+ protected final String [] keyNames;
+
+ public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) {
+ super(context, plan.getInSchema(), plan.getOutSchema(), child);
+ this.plan = plan;
+
+ if (plan.getType() == NodeType.CREATE_TABLE) {
+ this.outSchema = ((CreateTableNode)plan).getTableSchema();
+ }
+
+ // set table meta
+ if (this.plan.hasOptions()) {
+ meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+ } else {
+ meta = CatalogUtil.newTableMeta(plan.getStorageType());
+ }
+
+ // Find column index to name subpartition directory path
+ keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();
+
+ keyIds = new int[keyNum];
+ keyNames = new String[keyNum];
+ for (int i = 0; i < keyNum; i++) {
+ Column column = this.plan.getPartitionMethod().getExpressionSchema().getColumn(i);
+ keyNames[i] = column.getSimpleName();
+
+ if (this.plan.getType() == NodeType.INSERT) {
+ InsertNode insertNode = ((InsertNode)plan);
+ int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+ keyIds[i] = idx;
+ } else if (this.plan.getType() == NodeType.CREATE_TABLE) {
+ CreateTableNode createTable = (CreateTableNode) plan;
+ int idx = createTable.getLogicalSchema().getColumnId(column.getQualifiedName());
+ keyIds[i] = idx;
+ } else {
+ // We can get partition column from a logical schema.
+ // Don't use output schema because it is rewritten.
+ keyIds[i] = plan.getOutSchema().getColumnId(column.getQualifiedName());
+ }
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ storeTablePath = context.getOutputPath();
+ FileSystem fs = storeTablePath.getFileSystem(context.getConf());
+ if (!fs.exists(storeTablePath.getParent())) {
+ fs.mkdirs(storeTablePath.getParent());
+ }
+ }
+
+
+ protected Path getDataFile(String partition) {
+ return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
new file mode 100644
index 0000000..a843bce
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.EvalExprNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class EvalExprExec extends PhysicalExec {
+ private final EvalExprNode plan;
+ private float progress;
+
+ public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+ this.plan = plan;
+ }
+
+ @Override
+ public void init() throws IOException {
+ progress = 0.0f;
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Target [] targets = plan.getTargets();
+ Tuple t = new VTuple(targets.length);
+ for (int i = 0; i < targets.length; i++) {
+ t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+ }
+ return t;
+ }
+
+ @Override
+ public void rescan() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ progress = 1.0f;
+ }
+
+ @Override
+ public float getProgress() {
+ return progress;
+ }
+}