You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:55 UTC

[33/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
new file mode 100644
index 0000000..b8a9680
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+
+public class SelectionNode extends UnaryNode implements Cloneable {
+	@Expose private EvalNode qual;
+
+  public SelectionNode(int pid) {
+    super(pid, NodeType.SELECTION);
+  }
+
+	public EvalNode getQual() {
+		return this.qual;
+	}
+
+	public void setQual(EvalNode qual) {
+		this.qual = qual;
+	}
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+    planStr.addExplan("Search Cond: " + getQual());
+    return planStr;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof SelectionNode) {
+      SelectionNode other = (SelectionNode) obj;
+      return super.equals(other) 
+          && this.qual.equals(other.qual);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SelectionNode selNode = (SelectionNode) super.clone();
+    selNode.qual = (EvalNode) this.qual.clone();
+    
+    return selNode;
+  }
+
+  public String toString() {
+    return "Selection (filter=" + qual + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
new file mode 100644
index 0000000..c7ea454
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.util.TUtil;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.NONE_SHUFFLE;
+
+/**
+ * ShuffeFileWriteNode is an expression for an intermediate data materialization step.
+ */
+public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneable {
+  @Expose private TajoWorkerProtocol.ShuffleType shuffleType = NONE_SHUFFLE;
+  @Expose private int numOutputs;
+  @Expose private Column [] shuffleKeys;
+
+  public ShuffleFileWriteNode(int pid) {
+    super(pid, NodeType.STORE);
+  }
+    
+  public final int getNumOutputs() {
+    return this.numOutputs;
+  }
+  
+  public final boolean hasShuffleKeys() {
+    return this.shuffleKeys != null;
+  }
+  
+  public final Column [] getShuffleKeys() {
+    return shuffleKeys;
+  }
+  
+  public final void setShuffle(TajoWorkerProtocol.ShuffleType type, Column[] keys, int numPartitions) {
+    Preconditions.checkArgument(keys.length >= 0, 
+        "At least one partition key must be specified.");
+    // In outer join, zero can be passed into this value because of empty tables.
+    // So, we should allow zero.
+    Preconditions.checkArgument(numPartitions >= 0,
+        "The number of partitions must be positive: %s", numPartitions);
+
+    this.shuffleType = type;
+    this.shuffleKeys = keys;
+    this.numOutputs = numPartitions;
+  }
+
+  public TajoWorkerProtocol.ShuffleType getShuffleType() {
+    return this.shuffleType;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ShuffleFileWriteNode) {
+      ShuffleFileWriteNode other = (ShuffleFileWriteNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && this.numOutputs == other.numOutputs;
+      eq = eq && TUtil.checkEquals(shuffleKeys, other.shuffleKeys);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    ShuffleFileWriteNode store = (ShuffleFileWriteNode) super.clone();
+    store.numOutputs = numOutputs;
+    store.shuffleKeys = shuffleKeys != null ? shuffleKeys.clone() : null;
+    return store;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Shuffle Write (type=" + shuffleType.name().toLowerCase());
+    if (storageType != null) {
+      sb.append(", storage="+ storageType.name());
+    }
+    sb.append(", part number=").append(numOutputs);
+    if (shuffleKeys != null) {
+      sb.append(", keys: ").append(TUtil.arrayToString(shuffleKeys));
+    }
+    sb.append(")");
+    
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
new file mode 100644
index 0000000..a732710
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+public final class SortNode extends UnaryNode implements Cloneable {
+	@Expose private SortSpec [] sortKeys;
+
+  public SortNode(int pid) {
+    super(pid, NodeType.SORT);
+  }
+
+  public void setSortSpecs(SortSpec[] sortSpecs) {
+    Preconditions.checkArgument(sortSpecs.length > 0, "At least one sort key must be specified");
+    this.sortKeys = sortSpecs;
+  }
+  
+  public SortSpec[] getSortKeys() {
+    return this.sortKeys;
+  }
+  
+  @Override 
+  public boolean equals(Object obj) {
+    if (obj instanceof SortNode) {
+      SortNode other = (SortNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && TUtil.checkEquals(sortKeys, other.sortKeys);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    SortNode sort = (SortNode) super.clone();
+    sort.sortKeys = sortKeys.clone();
+    
+    return sort;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+    StringBuilder sb = new StringBuilder("Sort Keys: ");
+    for (int i = 0; i < sortKeys.length; i++) {
+      sb.append(sortKeys[i].getSortKey().getSimpleName()).append(" ")
+          .append(sortKeys[i].isAscending() ? "asc" : "desc");
+      if( i < sortKeys.length - 1) {
+        sb.append(",");
+      }
+    }
+    planStr.addExplan(sb.toString());
+    return planStr;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Sort [key= ");
+    for (int i = 0; i < sortKeys.length; i++) {    
+      sb.append(sortKeys[i].getSortKey().getQualifiedName()).append(" ")
+          .append(sortKeys[i].isAscending() ? "asc" : "desc");
+      if(i < sortKeys.length - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append("]");
+
+    sb.append("\n\"out schema: " + getOutSchema()
+        + "\n\"in schema: " + getInSchema());
+    return sb.toString()+"\n"
+        + getChild().toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
new file mode 100644
index 0000000..b0f7b7a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.util.TUtil;
+
+public class StoreTableNode extends PersistentStoreNode implements Cloneable {
+  @Expose protected String tableName;
+  @Expose private PartitionMethodDesc partitionDesc;
+
+  public StoreTableNode(int pid) {
+    super(pid, NodeType.STORE);
+  }
+
+  protected StoreTableNode(int pid, NodeType nodeType) {
+    super(pid, nodeType);
+  }
+
+  public boolean hasTargetTable() {
+    return tableName != null;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public final String getTableName() {
+    return this.tableName;
+  }
+
+  public boolean hasPartition() {
+    return this.partitionDesc != null;
+  }
+
+  public PartitionMethodDesc getPartitionMethod() {
+    return partitionDesc;
+  }
+
+  public void setPartitionMethod(PartitionMethodDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+    planStr.appendTitle(" into ").appendTitle(tableName);
+    planStr.addExplan("Store type: " + storageType);
+
+    return planStr;
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof StoreTableNode) {
+      StoreTableNode other = (StoreTableNode) obj;
+      boolean eq = super.equals(other);
+      eq = eq && this.tableName.equals(other.tableName);
+      eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    StoreTableNode store = (StoreTableNode) super.clone();
+    store.tableName = tableName;
+    store.partitionDesc = partitionDesc != null ? (PartitionMethodDesc) partitionDesc.clone() : null;
+    return store;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Store Table (table=").append(tableName);
+    if (storageType != null) {
+      sb.append(", storage="+ storageType.name());
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
new file mode 100644
index 0000000..4d0090b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+public class TableSubQueryNode extends RelationNode implements Projectable {
+  @Expose private String tableName;
+  @Expose private LogicalNode subQuery;
+  @Expose private Target [] targets; // unused
+
+  public TableSubQueryNode(int pid) {
+    super(pid, NodeType.TABLE_SUBQUERY);
+  }
+
+  public void init(String tableName, LogicalNode subQuery) {
+    this.tableName = tableName;
+    if (subQuery != null) {
+      this.subQuery = subQuery;
+      setOutSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+      setInSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+      getInSchema().setQualifier(this.tableName);
+      getOutSchema().setQualifier(this.tableName);
+    }
+  }
+
+  @Override
+  public boolean hasAlias() {
+    return false;
+  }
+
+  @Override
+  public String getAlias() {
+    return null;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public String getCanonicalName() {
+    return tableName;
+  }
+
+  @Override
+  public Schema getTableSchema() {
+    // an output schema can be determined by targets. So, an input schema of
+    // TableSubQueryNode is only eligible for table schema.
+    //
+    // TODO - but, a derived table can have column alias. For that, we should improve here.
+    //
+    // example) select * from (select col1, col2, col3 from t1) view (c1, c2);
+
+    return getInSchema();
+  }
+
+  public void setSubQuery(LogicalNode node) {
+    this.subQuery = node;
+    setInSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+    getInSchema().setQualifier(this.tableName);
+    if (hasTargets()) {
+      setOutSchema(PlannerUtil.targetToSchema(targets));
+    } else {
+      setOutSchema(SchemaUtil.clone(this.subQuery.getOutSchema()));
+    }
+    getOutSchema().setQualifier(this.tableName);
+  }
+
+  public LogicalNode getSubQuery() {
+    return subQuery;
+  }
+
+  @Override
+  public boolean hasTargets() {
+    return targets != null;
+  }
+
+  @Override
+  public void setTargets(Target[] targets) {
+    this.targets = targets;
+    setOutSchema(PlannerUtil.targetToSchema(targets));
+  }
+
+  @Override
+  public Target[] getTargets() {
+    return targets;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+    planStr.appendTitle(" as ").appendTitle(tableName);
+
+    if (hasTargets()) {
+      StringBuilder sb = new StringBuilder("Targets: ");
+      for (int i = 0; i < targets.length; i++) {
+        sb.append(targets[i]);
+        if( i < targets.length - 1) {
+          sb.append(", ");
+        }
+      }
+      planStr.addExplan(sb.toString());
+      if (getOutSchema() != null) {
+        planStr.addExplan("out schema: " + getOutSchema().toString());
+      }
+      if (getInSchema() != null) {
+        planStr.addExplan("in  schema: " + getInSchema().toString());
+      }
+    }
+
+    return planStr;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableName, subQuery);
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (object instanceof TableSubQueryNode) {
+      TableSubQueryNode another = (TableSubQueryNode) object;
+      return tableName.equals(another.tableName) && subQuery.equals(another.subQuery);
+    }
+
+    return false;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    TableSubQueryNode newTableSubQueryNode = (TableSubQueryNode) super.clone();
+    newTableSubQueryNode.tableName = tableName;
+    newTableSubQueryNode.subQuery = (LogicalNode) subQuery.clone();
+    if (hasTargets()) {
+      newTableSubQueryNode.targets = new Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        newTableSubQueryNode.targets[i] = (Target) targets[i].clone();
+      }
+    }
+    return newTableSubQueryNode;
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+    subQuery.preOrder(visitor);
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    subQuery.preOrder(visitor);
+    visitor.visit(this);
+  }
+
+  public String toString() {
+    return "Inline view (name=" + tableName + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
new file mode 100644
index 0000000..0b06e9e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.util.TUtil;
+
+
+public abstract class UnaryNode extends LogicalNode implements Cloneable {
+	@Expose LogicalNode child;
+	
+	/**
+	 * @param type
+	 */
+	public UnaryNode(int pid, NodeType type) {
+		super(pid, type);
+	}
+	
+	public void setChild(LogicalNode subNode) {
+		this.child = subNode;
+	}
+	
+	public <T extends LogicalNode> T getChild() {
+		return (T) this.child;
+	}
+
+  @Override
+  public boolean deepEquals(Object o) {
+    if (o instanceof UnaryNode) {
+      UnaryNode u = (UnaryNode) o;
+      return equals(o) && TUtil.checkEquals(child, u.child);
+    }
+    return false;
+  }
+	
+	@Override
+  public Object clone() throws CloneNotSupportedException {
+	  UnaryNode unary = (UnaryNode) super.clone();
+	  unary.child = (LogicalNode) (child == null ? null : child.clone());
+	  
+	  return unary;
+	}
+	
+	public void preOrder(LogicalNodeVisitor visitor) {
+	  visitor.visit(this);
+	  child.preOrder(visitor);
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {
+	  child.postOrder(visitor);
+	  visitor.visit(this);
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
new file mode 100644
index 0000000..49183d0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.planner.PlanString;
+
+public class UnionNode extends BinaryNode {
+
+  public UnionNode(int pid) {
+    super(pid, NodeType.UNION);
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString(this);
+    return planStr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
new file mode 100644
index 0000000..c8bca03
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/Edge.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.tajo.engine.eval.EvalNode;
+
+public class Edge {
+  private String src;
+  private String target;
+  private EvalNode joinQual;
+
+  public Edge(String src, String target, EvalNode joinQual) {
+    this.src = src;
+    this.target = target;
+    this.joinQual = joinQual;
+  }
+
+  public String getSrc() {
+    return this.src;
+  }
+
+  public String getTarget() {
+    return this.target;
+  }
+
+  public EvalNode getJoinQual() {
+    return this.joinQual;
+  }
+
+  @Override
+  public String toString() {
+    return "(" + src + "=> " + target + ", " + joinQual + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
new file mode 100644
index 0000000..5ae34f7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+
+/**
+ * It contains the result of join enumeration.
+ */
+@InterfaceStability.Evolving
+public class FoundJoinOrder {
+  private JoinNode joinNode;
+  private double cost;
+
+  public FoundJoinOrder(JoinNode joinNode, double cost) {
+    this.joinNode = joinNode;
+    this.cost = cost;
+  }
+
+  /**
+   * @return a ordered join operators
+   */
+  public JoinNode getOrderedJoin() {
+    return this.joinNode;
+  }
+
+  public double getCost() {
+    return cost;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
new file mode 100644
index 0000000..f2bcd77
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This is a greedy heuristic algorithm to find a bushy join tree. This algorithm finds
+ * the best join order with join conditions and pushed-down join conditions to
+ * all join operators.
+ */
+public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
+  public static double DEFAULT_SELECTION_FACTOR = 0.1;
+
+  @Override
+  public FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraph joinGraph,
+                                      Set<String> relationsWithoutQual) throws PlanningException {
+
+    // Setup a remain relation set to be joined
+    // Why we should use LinkedHashSet? - it should keep the deterministic for the order of joins.
+    // Otherwise, join orders can be different even if join costs are the same to each other.
+    Set<LogicalNode> remainRelations = new LinkedHashSet<LogicalNode>();
+    for (RelationNode relation : block.getRelations()) {
+      remainRelations.add(relation);
+    }
+
+    LogicalNode latestJoin;
+    JoinEdge bestPair;
+
+    while (remainRelations.size() > 1) {
+      // Find the best join pair among all joinable operators in candidate set.
+      bestPair = getBestPair(plan, joinGraph, remainRelations);
+
+      remainRelations.remove(bestPair.getLeftRelation()); // remainRels = remainRels \ Ti
+      remainRelations.remove(bestPair.getRightRelation()); // remainRels = remainRels \ Tj
+
+      latestJoin = createJoinNode(plan, bestPair);
+      remainRelations.add(latestJoin);
+
+      // all logical nodes should be registered to corresponding blocks
+      block.registerNode(latestJoin);
+    }
+
+    JoinNode joinTree = (JoinNode) remainRelations.iterator().next();
+    // all generated nodes should be registered to corresponding blocks
+    block.registerNode(joinTree);
+    return new FoundJoinOrder(joinTree, getCost(joinTree));
+  }
+
+  private static JoinNode createJoinNode(LogicalPlan plan, JoinEdge joinEdge) {
+    LogicalNode left = joinEdge.getLeftRelation();
+    LogicalNode right = joinEdge.getRightRelation();
+
+    JoinNode joinNode = plan.createNode(JoinNode.class);
+
+    if (PlannerUtil.isCommutativeJoin(joinEdge.getJoinType())) {
+      // if only one operator is relation
+      if ((left instanceof RelationNode) && !(right instanceof RelationNode)) {
+        // for left deep
+        joinNode.init(joinEdge.getJoinType(), right, left);
+      } else {
+        // if both operators are relation or if both are relations
+        // we don't need to concern the left-right position.
+        joinNode.init(joinEdge.getJoinType(), left, right);
+      }
+    } else {
+      joinNode.init(joinEdge.getJoinType(), left, right);
+    }
+
+    Schema mergedSchema = SchemaUtil.merge(joinNode.getLeftChild().getOutSchema(),
+        joinNode.getRightChild().getOutSchema());
+    joinNode.setInSchema(mergedSchema);
+    joinNode.setOutSchema(mergedSchema);
+    if (joinEdge.hasJoinQual()) {
+      joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
+    }
+    return joinNode;
+  }
+
+  /**
+   * Find the best join pair among all joinable operators in candidate set.
+   *
+   * @param plan a logical plan
+   * @param graph a join graph which consists of vertices and edges, where vertex is relation and
+   *              each edge is join condition.
+   * @param candidateSet candidate operators to be joined.
+   * @return The best join pair among them
+   * @throws PlanningException
+   */
+  private JoinEdge getBestPair(LogicalPlan plan, JoinGraph graph, Set<LogicalNode> candidateSet)
+      throws PlanningException {
+    double minCost = Double.MAX_VALUE;
+    JoinEdge bestJoin = null;
+
+    double minNonCrossJoinCost = Double.MAX_VALUE;
+    JoinEdge bestNonCrossJoin = null;
+
+    for (LogicalNode outer : candidateSet) {
+      for (LogicalNode inner : candidateSet) {
+        if (outer.equals(inner)) {
+          continue;
+        }
+
+        JoinEdge foundJoin = findJoin(plan, graph, outer, inner);
+        if (foundJoin == null) {
+          continue;
+        }
+        double cost = getCost(foundJoin);
+
+        if (cost < minCost) {
+          minCost = cost;
+          bestJoin = foundJoin;
+        }
+
+        // Keep the min cost join
+        // But, if there exists a qualified join, the qualified join must be chosen
+        // rather than cross join regardless of cost.
+        if (foundJoin.hasJoinQual()) {
+          if (cost < minNonCrossJoinCost) {
+            minNonCrossJoinCost = cost;
+            bestNonCrossJoin = foundJoin;
+          }
+        }
+      }
+    }
+
+    if (bestNonCrossJoin != null) {
+      return bestNonCrossJoin;
+    } else {
+      return bestJoin;
+    }
+  }
+
+  /**
+   * Find a join between two logical operator trees
+   *
+   * @return If there is no join condition between two relation, it returns NULL value.
+   */
+  private static JoinEdge findJoin(LogicalPlan plan, JoinGraph graph, LogicalNode outer, LogicalNode inner)
+      throws PlanningException {
+    JoinEdge foundJoinEdge = null;
+
+    for (String outerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer)) {
+      for (String innerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner)) {
+
+        // Find all joins between two relations and merge them into one join if possible
+        if (graph.hasEdge(outerName, innerName)) {
+          JoinEdge existJoinEdge = graph.getEdge(outerName, innerName);
+          if (foundJoinEdge == null) {
+            foundJoinEdge = new JoinEdge(existJoinEdge.getJoinType(), outer, inner,
+                existJoinEdge.getJoinQual());
+          } else {
+            foundJoinEdge.addJoinQual(AlgebraicUtil.createSingletonExprFromCNF(
+                existJoinEdge.getJoinQual()));
+          }
+        }
+      }
+    }
+
+    if (foundJoinEdge == null) {
+      foundJoinEdge = new JoinEdge(JoinType.CROSS, outer, inner);
+    }
+
+    return foundJoinEdge;
+  }
+
+  /**
+   * Getting a cost of one join
+   * @param joinEdge
+   * @return
+   */
+  public static double getCost(JoinEdge joinEdge) {
+    double filterFactor = 1;
+    if (joinEdge.hasJoinQual()) {
+      // TODO - should consider join type
+      // TODO - should statistic information obtained from query history
+      filterFactor = filterFactor * Math.pow(DEFAULT_SELECTION_FACTOR, joinEdge.getJoinQual().length);
+      return getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()) * filterFactor;
+    } else {
+      // make cost bigger if cross join
+      return Math.pow(getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()), 2);
+    }
+  }
+
+  // TODO - costs of other operator operators (e.g., group-by and sort) should be computed in proper manners.
+  public static double getCost(LogicalNode node) {
+    switch (node.getType()) {
+
+    case PROJECTION:
+      ProjectionNode projectionNode = (ProjectionNode) node;
+      return getCost(projectionNode.getChild());
+
+    case JOIN:
+      JoinNode joinNode = (JoinNode) node;
+      double filterFactor = 1;
+      if (joinNode.hasJoinQual()) {
+        filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
+            AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
+        return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
+      } else {
+        return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
+      }
+
+    case SELECTION:
+      SelectionNode selectionNode = (SelectionNode) node;
+      return getCost(selectionNode.getChild()) *
+          Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
+
+    case TABLE_SUBQUERY:
+      TableSubQueryNode subQueryNode = (TableSubQueryNode) node;
+      return getCost(subQueryNode.getSubQuery());
+
+    case SCAN:
+      ScanNode scanNode = (ScanNode) node;
+      if (scanNode.getTableDesc().getStats() != null) {
+        double cost = ((ScanNode)node).getTableDesc().getStats().getNumBytes();
+        return cost;
+      } else {
+        return Long.MAX_VALUE;
+      }
+
+    case UNION:
+      UnionNode unionNode = (UnionNode) node;
+      return getCost(unionNode.getLeftChild()) + getCost(unionNode.getRightChild());
+
+    case EXCEPT:
+    case INTERSECT:
+      throw new UnsupportedOperationException("getCost() does not support EXCEPT or INTERSECT yet");
+
+    default:
+      // all binary operators (join, union, except, and intersect) are handled in the above cases.
+      // So, we need to handle only unary nodes in default.
+      return getCost(((UnaryNode) node).getChild());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
new file mode 100644
index 0000000..e5c29f0
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.RelationNode;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class JoinEdge {
+  private final JoinType joinType;
+  private final LogicalNode leftRelation;
+  private final LogicalNode rightRelation;
+  private final Set<EvalNode> joinQual = Sets.newHashSet();
+
+  public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation) {
+    this.joinType = joinType;
+    this.leftRelation = leftRelation;
+    this.rightRelation = rightRelation;
+  }
+
+  public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation,
+                  EvalNode ... condition) {
+    this(joinType, leftRelation, rightRelation);
+    Collections.addAll(joinQual, condition);
+  }
+
+  public JoinType getJoinType() {
+    return joinType;
+  }
+
+  public LogicalNode getLeftRelation() {
+    return leftRelation;
+  }
+
+  public LogicalNode getRightRelation() {
+    return rightRelation;
+  }
+
+  public boolean hasJoinQual() {
+    return joinQual.size() > 0;
+  }
+
+  public void addJoinQual(EvalNode joinQual) {
+    this.joinQual.add(joinQual);
+  }
+
+  public EvalNode [] getJoinQual() {
+    return joinQual.toArray(new EvalNode[joinQual.size()]);
+  }
+
+  public String toString() {
+    return leftRelation + " " + joinType + " " + rightRelation + " ON " + TUtil.collectionToString(joinQual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
new file mode 100644
index 0000000..77e03ea
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import com.google.common.collect.Sets;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.NamedExprsManager;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+
+import java.util.Collection;
+import java.util.Set;
+
+public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
+
+  private String [] guessRelationsFromJoinQual(LogicalPlan.QueryBlock block, EvalNode joinCondition)
+      throws PlanningException {
+
+    // Note that we can guarantee that each join qual used here is a singleton.
+    // This is because we use dissect a join qual into conjunctive normal forms.
+    // In other words, each join qual has a form 'col1 = col2'.
+    Column leftExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getLeftExpr()).get(0);
+    Column rightExpr = EvalTreeUtil.findAllColumnRefs(joinCondition.getRightExpr()).get(0);
+
+    // 0 - left table, 1 - right table
+    String [] relationNames = new String[2];
+
+    NamedExprsManager namedExprsMgr = block.getNamedExprsManager();
+    if (leftExpr.hasQualifier()) {
+      relationNames[0] = leftExpr.getQualifier();
+    } else {
+      if (namedExprsMgr.isAliasedName(leftExpr.getSimpleName())) {
+        String columnName = namedExprsMgr.getOriginalName(leftExpr.getSimpleName());
+        String qualifier = CatalogUtil.extractQualifier(columnName);
+        relationNames[0] = qualifier;
+      } else {
+        throw new PlanningException("Cannot expect a referenced relation: " + leftExpr);
+      }
+    }
+
+    if (rightExpr.hasQualifier()) {
+      relationNames[1] = rightExpr.getQualifier();
+    } else {
+      if (namedExprsMgr.isAliasedName(rightExpr.getSimpleName())) {
+        String columnName = namedExprsMgr.getOriginalName(rightExpr.getSimpleName());
+        String qualifier = CatalogUtil.extractQualifier(columnName);
+        relationNames[1] = qualifier;
+      } else {
+        throw new PlanningException("Cannot expect a referenced relation: " + rightExpr);
+      }
+    }
+
+    return relationNames;
+  }
+  public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      JoinNode joinNode) throws PlanningException {
+    Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
+    Set<EvalNode> nonJoinQuals = Sets.newHashSet();
+    for (EvalNode singleQual : cnf) {
+      if (EvalTreeUtil.isJoinQual(singleQual, true)) {
+
+        String [] relations = guessRelationsFromJoinQual(block, singleQual);
+        String leftExprRelName = relations[0];
+        String rightExprRelName = relations[1];
+
+        Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
+
+        boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelName);
+        JoinEdge edge;
+        edge = getEdge(leftExprRelName, rightExprRelName);
+
+        if (edge != null) {
+          edge.addJoinQual(singleQual);
+        } else {
+          if (isLeftExprForLeftTable) {
+            edge = new JoinEdge(joinNode.getJoinType(),
+                block.getRelation(leftExprRelName), block.getRelation(rightExprRelName), singleQual);
+            addEdge(leftExprRelName, rightExprRelName, edge);
+          } else {
+            edge = new JoinEdge(joinNode.getJoinType(),
+                block.getRelation(rightExprRelName), block.getRelation(leftExprRelName), singleQual);
+            addEdge(rightExprRelName, leftExprRelName, edge);
+          }
+        }
+      } else {
+        nonJoinQuals.add(singleQual);
+      }
+    }
+    cnf.retainAll(nonJoinQuals);
+    return cnf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
new file mode 100644
index 0000000..eafa671
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinOrderAlgorithm.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical.join;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlanningException;
+
+import java.util.Set;
+
+/**
+ * An interface for join order algorithms
+ */
+@InterfaceStability.Evolving
+public interface JoinOrderAlgorithm {
+
+  /**
+   *
+   * @param plan
+   * @param block
+   * @param joinGraph A join graph represents join conditions and their connections among relations.
+   *                  Given a graph, each vertex represents a relation, and each edge contains a join condition.
+   *                  A join graph does not contain relations that do not have any corresponding join condition.
+   * @param relationsWithoutQual The names of relations that do not have any corresponding join condition.
+   * @return
+   * @throws PlanningException
+   */
+  FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraph joinGraph,
+                               Set<String> relationsWithoutQual) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
new file mode 100644
index 0000000..208973e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public abstract class AggregationExec extends UnaryPhysicalExec {
+  protected GroupbyNode plan;
+
+  protected final int groupingKeyNum;
+  protected int groupingKeyIds[];
+  protected final int aggFunctionsNum;
+  protected final AggregationFunctionCallEval aggFunctions[];
+
+  protected Schema evalSchema;
+
+  public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
+                         PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.plan = plan;
+
+    evalSchema = plan.getOutSchema();
+
+    final Column [] keyColumns = plan.getGroupingColumns();
+    groupingKeyNum = keyColumns.length;
+    groupingKeyIds = new int[groupingKeyNum];
+    Column col;
+    for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+      col = keyColumns[idx];
+      groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
+    }
+
+    if (plan.hasAggFunctions()) {
+      aggFunctions = plan.getAggFunctions();
+      aggFunctionsNum = aggFunctions.length;
+    } else {
+      aggFunctions = new AggregationFunctionCallEval[0];
+      aggFunctionsNum = 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    plan = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
new file mode 100644
index 0000000..60a7c19
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class BNLJoinExec extends BinaryPhysicalExec {
+  // from logical plan
+  private JoinNode plan;
+  private final boolean hasJoinQual;
+  private EvalNode joinQual;
+
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> rightTupleSlots;
+  private Iterator<Tuple> leftIterator;
+  private Iterator<Tuple> rightIterator;
+
+  private boolean leftEnd;
+  private boolean rightEnd;
+
+  // temporal tuples and states for nested loop join
+  private FrameTuple frameTuple;
+  private Tuple leftTuple = null;
+  private Tuple outputTuple = null;
+  private Tuple rightNext = null;
+
+  private final int TUPLE_SLOT_SIZE = 10000;
+
+  // projection
+  private Projector projector;
+
+  public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
+                     final PhysicalExec leftExec, PhysicalExec rightExec) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), leftExec, rightExec);
+    this.plan = plan;
+    this.joinQual = plan.getJoinQual();
+    if (joinQual != null) { // if join type is not 'cross join'
+      hasJoinQual = true;
+    } else {
+      hasJoinQual = false;
+    }
+    this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.leftIterator = leftTupleSlots.iterator();
+    this.rightIterator = rightTupleSlots.iterator();
+    this.rightEnd = false;
+    this.leftEnd = false;
+
+    // for projection
+    if (!plan.hasTargets()) {
+      plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
+    }
+
+    projector = new Projector(inSchema, outSchema, plan.getTargets());
+
+    // for join
+    frameTuple = new FrameTuple();
+    outputTuple = new VTuple(outSchema.size());
+  }
+
+  public JoinNode getPlan() {
+    return plan;
+  }
+
+  public Tuple next() throws IOException {
+
+    if (leftTupleSlots.isEmpty()) {
+      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+        Tuple t = leftChild.next();
+        if (t == null) {
+          leftEnd = true;
+          break;
+        }
+        leftTupleSlots.add(t);
+      }
+      leftIterator = leftTupleSlots.iterator();
+      leftTuple = leftIterator.next();
+    }
+
+    if (rightTupleSlots.isEmpty()) {
+      for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+        Tuple t = rightChild.next();
+        if (t == null) {
+          rightEnd = true;
+          break;
+        }
+        rightTupleSlots.add(t);
+      }
+      rightIterator = rightTupleSlots.iterator();
+    }
+
+    if((rightNext = rightChild.next()) == null){
+      rightEnd = true;
+    }
+
+    while (true) {
+      if (!rightIterator.hasNext()) { // if leftIterator ended
+        if (leftIterator.hasNext()) { // if rightTupleslot remains
+          leftTuple = leftIterator.next();
+          rightIterator = rightTupleSlots.iterator();
+        } else {
+          if (rightEnd) {
+            rightChild.rescan();
+            rightEnd = false;
+            
+            if (leftEnd) {
+              return null;
+            }
+            leftTupleSlots.clear();
+            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) {
+              Tuple t = leftChild.next();
+              if (t == null) {
+                leftEnd = true;
+                break;
+              }
+              leftTupleSlots.add(t);
+            }
+            if (leftTupleSlots.isEmpty()) {
+              return null;
+            }
+            leftIterator = leftTupleSlots.iterator();
+            leftTuple = leftIterator.next();
+            
+          } else {
+            leftIterator = leftTupleSlots.iterator();
+            leftTuple = leftIterator.next();
+          }
+          
+          rightTupleSlots.clear();
+          if (rightNext != null) {
+            rightTupleSlots.add(rightNext);
+            for (int k = 1; k < TUPLE_SLOT_SIZE; k++) { // fill right
+              Tuple t = rightChild.next();
+              if (t == null) {
+                rightEnd = true;
+                break;
+              }
+              rightTupleSlots.add(t);
+            }
+          } else {
+            for (int k = 0; k < TUPLE_SLOT_SIZE; k++) { // fill right
+              Tuple t = rightChild.next();
+              if (t == null) {
+                rightEnd = true;
+                break;
+              }
+              rightTupleSlots.add(t);
+            }
+          }
+          
+          if ((rightNext = rightChild.next()) == null) {
+            rightEnd = true;
+          }
+          rightIterator = rightTupleSlots.iterator();
+        }
+      }
+
+      frameTuple.set(leftTuple, rightIterator.next());
+      if (hasJoinQual) {
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outputTuple);
+          return outputTuple;
+        }
+      } else {
+        projector.eval(frameTuple, outputTuple);
+        return outputTuple;
+      }
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+    rightEnd = false;
+    rightTupleSlots.clear();
+    leftTupleSlots.clear();
+    rightIterator = rightTupleSlots.iterator();
+    leftIterator = leftTupleSlots.iterator();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    rightTupleSlots.clear();
+    leftTupleSlots.clear();
+    rightTupleSlots = null;
+    leftTupleSlots = null;
+    rightIterator = null;
+    leftIterator = null;
+    plan = null;
+    joinQual = null;
+    projector = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
new file mode 100644
index 0000000..35de707
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class BSTIndexScanExec extends PhysicalExec {
+  private ScanNode scanNode;
+  private SeekableScanner fileScanner;
+  
+  private EvalNode qual;
+  private BSTIndex.BSTIndexReader reader;
+  
+  private Projector projector;
+  
+  private Datum[] datum = null;
+  
+  private boolean initialize = true;
+
+  private float progress;
+
+  public BSTIndexScanExec(TaskAttemptContext context,
+                          AbstractStorageManager sm , ScanNode scanNode ,
+       FileFragment fragment, Path fileName , Schema keySchema,
+       TupleComparator comparator , Datum[] datum) throws IOException {
+    super(context, scanNode.getInSchema(), scanNode.getOutSchema());
+    this.scanNode = scanNode;
+    this.qual = scanNode.getQual();
+    this.datum = datum;
+
+    this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
+        scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
+    this.fileScanner.init();
+    this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
+
+    this.reader = new BSTIndex(sm.getFileSystem().getConf()).
+        getIndexReader(fileName, keySchema, comparator);
+    this.reader.open();
+  }
+
+  @Override
+  public void init() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if(initialize) {
+      //TODO : more complicated condition
+      Tuple key = new VTuple(datum.length);
+      key.put(datum);
+      long offset = reader.find(key);
+      if (offset == -1) {
+        reader.close();
+        fileScanner.close();
+        return null;
+      }else {
+        fileScanner.seek(offset);
+      }
+      initialize = false;
+    } else {
+      if(!reader.isCurInMemory()) {
+        return null;
+      }
+      long offset = reader.next();
+      if(offset == -1 ) {
+        reader.close();
+        fileScanner.close();
+        return null;
+      } else { 
+      fileScanner.seek(offset);
+      }
+    }
+    Tuple tuple;
+    Tuple outTuple = new VTuple(this.outSchema.size());
+    if (!scanNode.hasQual()) {
+      if ((tuple = fileScanner.next()) != null) {
+        projector.eval(tuple, outTuple);
+        return outTuple;
+      } else {
+        return null;
+      }
+    } else {
+       while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+         if (qual.eval(inSchema, tuple).isTrue()) {
+           projector.eval(tuple, outTuple);
+           return outTuple;
+         } else {
+           fileScanner.seek(reader.next());
+         }
+       }
+     }
+
+    return null;
+  }
+  @Override
+  public void rescan() throws IOException {
+    fileScanner.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOUtils.cleanup(null, reader, fileScanner);
+    reader = null;
+    fileScanner = null;
+    scanNode = null;
+    qual = null;
+    projector = null;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
new file mode 100644
index 0000000..f6f3e52
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+  public RESULT visit(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+
+    // Please keep all physical executors except for abstract class.
+    // They should be ordered in an lexicography order of their names for easy code maintenance.
+    if (exec instanceof BNLJoinExec) {
+      return visitBNLJoin(context, (BNLJoinExec) exec, stack);
+    } else if (exec instanceof BSTIndexScanExec) {
+      return visitBSTIndexScan(context, (BSTIndexScanExec) exec, stack);
+    } else if (exec instanceof EvalExprExec) {
+      return visitEvalExpr(context, (EvalExprExec) exec, stack);
+    } else if (exec instanceof ExternalSortExec) {
+      return visitExternalSort(context, (ExternalSortExec) exec, stack);
+    } else if (exec instanceof HashAggregateExec) {
+      return visitHashAggregate(context, (HashAggregateExec) exec, stack);
+    } else if (exec instanceof HashBasedColPartitionStoreExec) {
+      return visitHashBasedColPartitionStore(context, (HashBasedColPartitionStoreExec) exec, stack);
+    } else if (exec instanceof HashFullOuterJoinExec) {
+      return visitHashFullOuterJoin(context, (HashFullOuterJoinExec) exec, stack);
+    } else if (exec instanceof HashJoinExec) {
+      return visitHashJoin(context, (HashJoinExec) exec, stack);
+    } else if (exec instanceof HashLeftAntiJoinExec) {
+      return visitHashLeftAntiJoin(context, (HashLeftAntiJoinExec) exec, stack);
+    } else if (exec instanceof HashLeftOuterJoinExec) {
+      return visitHashLeftOuterJoin(context, (HashLeftOuterJoinExec) exec, stack);
+    } else if (exec instanceof HashLeftSemiJoinExec) {
+      return visitLeftHashSemiJoin(context, (HashLeftSemiJoinExec) exec, stack);
+    } else if (exec instanceof HashShuffleFileWriteExec) {
+      return visitHashShuffleFileWrite(context, (HashShuffleFileWriteExec) exec, stack);
+    } else if (exec instanceof HavingExec) {
+      return visitHaving(context, (HavingExec) exec, stack);
+    } else if (exec instanceof LimitExec) {
+      return visitLimit(context, (LimitExec) exec, stack);
+    } else if (exec instanceof MemSortExec) {
+      return visitMemSort(context, (MemSortExec) exec, stack);
+    } else if (exec instanceof MergeFullOuterJoinExec) {
+      return visitMergeFullOuterJoin(context, (MergeFullOuterJoinExec) exec, stack);
+    } else if (exec instanceof MergeJoinExec) {
+      return visitMergeJoin(context, (MergeJoinExec) exec, stack);
+    } else if (exec instanceof NLJoinExec) {
+      return visitNLJoin(context, (NLJoinExec) exec, stack);
+    } else if (exec instanceof NLLeftOuterJoinExec) {
+      return visitNLLeftOuterJoin(context, (NLLeftOuterJoinExec) exec, stack);
+    } else if (exec instanceof ProjectionExec) {
+      return visitProjection(context, (ProjectionExec) exec, stack);
+    } else if (exec instanceof RangeShuffleFileWriteExec) {
+      return visitRangeShuffleFileWrite(context, (RangeShuffleFileWriteExec) exec, stack);
+    } else if (exec instanceof RightOuterMergeJoinExec) {
+      return visitRightOuterMergeJoin(context, (RightOuterMergeJoinExec) exec, stack);
+    } else if (exec instanceof SelectionExec) {
+      return visitSelection(context, (SelectionExec) exec, stack);
+    } else if (exec instanceof SeqScanExec) {
+      return visitSeqScan(context, (SeqScanExec) exec, stack);
+    } else if (exec instanceof SortAggregateExec) {
+      return visitSortAggregate(context, (SortAggregateExec) exec, stack);
+    } else if (exec instanceof SortBasedColPartitionStoreExec) {
+      return visitSortBasedColPartitionStore(context, (SortBasedColPartitionStoreExec) exec, stack);
+    } else if (exec instanceof StoreTableExec) {
+      return visitStoreTable(context, (StoreTableExec) exec, stack);
+    }
+
+    throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+  }
+
+  private RESULT visitUnaryExecutor(CONTEXT context, UnaryPhysicalExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    stack.push(exec);
+    RESULT r = visit(exec.getChild(), stack, context);
+    stack.pop();
+    return r;
+  }
+
+  private RESULT visitBinaryExecutor(CONTEXT context, BinaryPhysicalExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    stack.push(exec);
+    RESULT r = visit(exec.getLeftChild(), stack, context);
+    visit(exec.getRightChild(), stack, context);
+    stack.pop();
+    return r;
+  }
+
+  @Override
+  public RESULT visitBNLJoin(CONTEXT context, BNLJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitBSTIndexScan(CONTEXT context, BSTIndexScanExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitEvalExpr(CONTEXT context, EvalExprExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitExternalSort(CONTEXT context, ExternalSortExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashAggregate(CONTEXT context, HashAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashBasedColPartitionStore(CONTEXT context, HashBasedColPartitionStoreExec exec,
+                                                Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashFullOuterJoin(CONTEXT context, HashFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashJoin(CONTEXT context, HashJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashLeftAntiJoin(CONTEXT context, HashLeftAntiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashLeftOuterJoin(CONTEXT context, HashLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitLeftHashSemiJoin(CONTEXT context, HashLeftSemiJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHashShuffleFileWrite(CONTEXT context, HashShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitHaving(CONTEXT context, HavingExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitLimit(CONTEXT context, LimitExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitMemSort(CONTEXT context, MemSortExec exec, Stack<PhysicalExec> stack) throws
+      PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitMergeFullOuterJoin(CONTEXT context, MergeFullOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitMergeJoin(CONTEXT context, MergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitNLJoin(CONTEXT context, NLJoinExec exec, Stack<PhysicalExec> stack) throws
+      PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitNLLeftOuterJoin(CONTEXT context, NLLeftOuterJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitProjection(CONTEXT context, ProjectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitRangeShuffleFileWrite(CONTEXT context, RangeShuffleFileWriteExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitRightOuterMergeJoin(CONTEXT context, RightOuterMergeJoinExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitSelection(CONTEXT context, SelectionExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitSeqScan(CONTEXT context, SeqScanExec exec, Stack<PhysicalExec> stack) {
+    return null;
+  }
+
+  @Override
+  public RESULT visitSortAggregate(CONTEXT context, SortAggregateExec exec, Stack<PhysicalExec> stack)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitSortBasedColPartitionStore(CONTEXT context, SortBasedColPartitionStoreExec exec,
+                                                Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+
+  @Override
+  public RESULT visitStoreTable(CONTEXT context, StoreTableExec exec, Stack<PhysicalExec> stack) throws PhysicalPlanningException {
+    return visitUnaryExecutor(context, exec, stack);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
new file mode 100644
index 0000000..628c18c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+
+import java.io.IOException;
+
+public abstract class BinaryPhysicalExec extends PhysicalExec {
+  protected PhysicalExec leftChild;
+  protected PhysicalExec rightChild;
+  protected float progress;
+  protected TableStats inputStats;
+
+  public BinaryPhysicalExec(final TaskAttemptContext context,
+                            final Schema inSchema, final Schema outSchema,
+                            final PhysicalExec outer, final PhysicalExec inner) {
+    super(context, inSchema, outSchema);
+    this.leftChild = outer;
+    this.rightChild = inner;
+    this.inputStats = new TableStats();
+  }
+
+  public PhysicalExec getLeftChild() {
+    return leftChild;
+  }
+
+  public PhysicalExec getRightChild() {
+    return rightChild;
+  }
+
+  @Override
+  public void init() throws IOException {
+    leftChild.init();
+    rightChild.init();
+    progress = 0.0f;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    leftChild.rescan();
+    rightChild.rescan();
+  }
+
+  @Override
+  public void close() throws IOException {
+    leftChild.close();
+    rightChild.close();
+
+    getInputStats();
+    
+    leftChild = null;
+    rightChild = null;
+    
+    progress = 1.0f;
+  }
+
+  @Override
+  public float getProgress() {
+    if (leftChild == null) {
+      return progress;
+    }
+    return leftChild.getProgress() * 0.5f + rightChild.getProgress() * 0.5f;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (leftChild == null) {
+      return inputStats;
+    }
+    TableStats leftInputStats = leftChild.getInputStats();
+    inputStats.setNumBytes(0);
+    inputStats.setReadBytes(0);
+    inputStats.setNumRows(0);
+
+    if (leftInputStats != null) {
+      inputStats.setNumBytes(leftInputStats.getNumBytes());
+      inputStats.setReadBytes(leftInputStats.getReadBytes());
+      inputStats.setNumRows(leftInputStats.getNumRows());
+    }
+
+    TableStats rightInputStats = rightChild.getInputStats();
+    if (rightInputStats != null) {
+      inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
+      inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
+      inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
+    }
+
+    return inputStats;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
new file mode 100644
index 0000000..fe36905
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.logical.InsertNode;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
+  protected final TableMeta meta;
+  protected final StoreTableNode plan;
+  protected Path storeTablePath;
+
+  protected final int keyNum;
+  protected final int [] keyIds;
+  protected final String [] keyNames;
+
+  public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.plan = plan;
+
+    if (plan.getType() == NodeType.CREATE_TABLE) {
+      this.outSchema = ((CreateTableNode)plan).getTableSchema();
+    }
+
+    // set table meta
+    if (this.plan.hasOptions()) {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+    } else {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType());
+    }
+
+    // Find column index to name subpartition directory path
+    keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();
+
+    keyIds = new int[keyNum];
+    keyNames = new String[keyNum];
+    for (int i = 0; i < keyNum; i++) {
+      Column column = this.plan.getPartitionMethod().getExpressionSchema().getColumn(i);
+      keyNames[i] = column.getSimpleName();
+
+      if (this.plan.getType() == NodeType.INSERT) {
+        InsertNode insertNode = ((InsertNode)plan);
+        int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+        keyIds[i] = idx;
+      } else if (this.plan.getType() == NodeType.CREATE_TABLE) {
+        CreateTableNode createTable = (CreateTableNode) plan;
+        int idx = createTable.getLogicalSchema().getColumnId(column.getQualifiedName());
+        keyIds[i] = idx;
+      } else {
+        // We can get partition column from a logical schema.
+        // Don't use output schema because it is rewritten.
+        keyIds[i] = plan.getOutSchema().getColumnId(column.getQualifiedName());
+      }
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+
+    storeTablePath = context.getOutputPath();
+    FileSystem fs = storeTablePath.getFileSystem(context.getConf());
+    if (!fs.exists(storeTablePath.getParent())) {
+      fs.mkdirs(storeTablePath.getParent());
+    }
+  }
+
+
+  protected Path getDataFile(String partition) {
+    return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
new file mode 100644
index 0000000..a843bce
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.EvalExprNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public class EvalExprExec extends PhysicalExec {
+  private final EvalExprNode plan;
+  private float progress;
+
+  public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+    this.plan = plan;
+  }
+
+  @Override
+  public void init() throws IOException {
+    progress = 0.0f;
+  }
+
+  @Override
+  public Tuple next() throws IOException {    
+    Target [] targets = plan.getTargets();
+    Tuple t = new VTuple(targets.length);
+    for (int i = 0; i < targets.length; i++) {
+      t.put(i, targets[i].getEvalTree().eval(inSchema, null));
+    }
+    return t;
+  }
+
+  @Override
+  public void rescan() throws IOException {    
+  }
+
+  @Override
+  public void close() throws IOException {
+    progress = 1.0f;
+  }
+
+  @Override
+  public float getProgress() {
+    return progress;
+  }
+}