You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:32 UTC

[5/7] TAJO-184: Refactor GlobalPlanner and global plan data structure. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
index e053c3c..e754a7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/SortNode.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 public final class SortNode extends UnaryNode implements Cloneable {
@@ -69,7 +70,22 @@ public final class SortNode extends UnaryNode implements Cloneable {
     
     return sort;
   }
-  
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Sort");
+    StringBuilder sb = new StringBuilder("Sort Keys: ");
+    for (int i = 0; i < sortKeys.length; i++) {
+      sb.append(sortKeys[i].getSortKey().getColumnName()).append(" ")
+          .append(sortKeys[i].isAscending() ? "asc" : "desc");
+      if( i < sortKeys.length - 1) {
+        sb.append(",");
+      }
+    }
+    planStr.addExplan(sb.toString());
+    return planStr;
+  }
+
   public String toString() {
     StringBuilder sb = new StringBuilder("Sort [key= ");
     for (int i = 0; i < sortKeys.length; i++) {    

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 817ca35..a2dd097 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,10 +22,12 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
+import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.LIST_PARTITION;
 
 public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private String tableName;
@@ -33,7 +35,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private PartitionType partitionType;
   @Expose private int numPartitions;
   @Expose private Column [] partitionKeys;
-  @Expose private boolean local;
   @Expose private Options options;
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
@@ -41,7 +42,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   public StoreTableNode(String tableName) {
     super(NodeType.STORE);
     this.tableName = tableName;
-    this.local = false;
   }
 
   public final String getTableName() {
@@ -59,14 +59,6 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   public StoreType getStorageType() {
     return this.storageType;
   }
-
-  public final void setLocal(boolean local) {
-    this.local = local;
-  }
-
-  public final boolean isLocal() {
-    return this.local;
-  }
     
   public final int getNumPartitions() {
     return this.numPartitions;
@@ -80,10 +72,10 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.partitionKeys;
   }
 
-  public final void setListPartition() {
-    this.partitionType = PartitionType.LIST;
+  public final void setDefaultParition() {
+    this.partitionType = LIST_PARTITION;
     this.partitionKeys = null;
-    this.numPartitions = 0;
+    this.numPartitions = 1;
   }
   
   public final void setPartitions(PartitionType type, Column [] keys, int numPartitions) {
@@ -113,6 +105,16 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.options;
   }
 
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Store");
+    planStr.appendTitle(" into ").appendTitle(tableName);
+    planStr.addExplan("Store type: " + storageType);
+
+    return planStr;
+  }
+
   public boolean isCreatedTable() {
     return isCreatedTable;
   }
@@ -152,7 +154,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     store.isOverwritten = isOverwritten;
     return store;
   }
-  
+
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("\"Store\": {\"table\": \""+tableName);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
new file mode 100644
index 0000000..8ad8cf5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.Target;
+
+public class TableSubQueryNode extends RelationNode implements Projectable {
+  @Expose private String tableName;
+  @Expose private LogicalNode subQuery;
+  @Expose private Target [] targets; // unused
+
+  public TableSubQueryNode(String tableName, LogicalNode subQuery) {
+    super(NodeType.TABLE_SUBQUERY);
+    this.tableName = tableName;
+    this.subQuery = subQuery;
+    setOutSchema(PlannerUtil.getQualifiedSchema(this.subQuery.getOutSchema(), tableName));
+    setInSchema(this.subQuery.getInSchema());
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  @Override
+  public String getCanonicalName() {
+    return tableName;
+  }
+
+  @Override
+  public Schema getTableSchema() {
+    return getOutSchema();
+  }
+
+  public void setSubQuery(LogicalNode node) {
+    this.subQuery = node;
+    setInSchema(subQuery.getInSchema());
+  }
+
+  public LogicalNode getSubQuery() {
+    return subQuery;
+  }
+
+  @Override
+  public boolean hasTargets() {
+    return targets != null;
+  }
+
+  @Override
+  public void setTargets(Target[] targets) {
+    this.targets = targets;
+  }
+
+  @Override
+  public Target[] getTargets() {
+    return targets;
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("TableSubQuery");
+    planStr.appendTitle(" as ").appendTitle(tableName);
+    return planStr;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(tableName, subQuery);
+  }
+
+  @Override
+  public boolean equals(Object object) {
+    if (object instanceof TableSubQueryNode) {
+      TableSubQueryNode another = (TableSubQueryNode) object;
+      return tableName.equals(another.tableName) && subQuery.equals(another.subQuery);
+    }
+
+    return false;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    TableSubQueryNode newTableSubQueryNode = (TableSubQueryNode) super.clone();
+    newTableSubQueryNode.tableName = tableName;
+    return newTableSubQueryNode;
+  }
+
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+    subQuery.preOrder(visitor);
+  }
+
+  @Override
+  public void postOrder(LogicalNodeVisitor visitor) {
+    subQuery.preOrder(visitor);
+    visitor.visit(this);
+  }
+
+  public String toString() {
+    return "Table Subquery (alias = " + tableName + ")\n" + subQuery.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
index 4380996..7f6e065 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnaryNode.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * 
- */
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
@@ -42,8 +39,8 @@ public abstract class UnaryNode extends LogicalNode implements Cloneable {
 		this.child = subNode;
 	}
 	
-	public LogicalNode getChild() {
-		return this.child;
+	public <T extends LogicalNode> T getChild() {
+		return (T) this.child;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
index c0aafbd..a62e91b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/UnionNode.java
@@ -21,6 +21,8 @@
  */
 package org.apache.tajo.engine.planner.logical;
 
+import org.apache.tajo.engine.planner.PlanString;
+
 public class UnionNode extends BinaryNode {
 
   public UnionNode() {
@@ -33,6 +35,16 @@ public class UnionNode extends BinaryNode {
     setRightChild(inner);
   }
 
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Union");
+    planStr.appendTitle(" (L - " + ((TableSubQueryNode)getLeftChild()).getTableName());
+    planStr.appendTitle(", R - " + ((TableSubQueryNode)getRightChild()).getTableName());
+    planStr.appendTitle(")");
+    return planStr;
+  }
+
   public String toString() {
     return getLeftChild().toString() + "\n UNION \n" + getRightChild().toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
index d76af7d..2ec055a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinTree.java
@@ -37,8 +37,8 @@ public class JoinTree {
     List<Column> left = EvalTreeUtil.findAllColumnRefs(node.getLeftExpr());
     List<Column> right = EvalTreeUtil.findAllColumnRefs(node.getRightExpr());
 
-    String ltbName = left.get(0).getTableName();
-    String rtbName = right.get(0).getTableName();
+    String ltbName = left.get(0).getQualifier();
+    String rtbName = right.get(0).getQualifier();
 
     Edge l2r = new Edge(ltbName, rtbName, node);
     Edge r2l = new Edge(rtbName, ltbName, node);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
new file mode 100644
index 0000000..fc569d7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {
+
+  @Override
+  public RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+
+    if (exec instanceof SeqScanExec) {
+      return visitSeqScan((SeqScanExec) exec, stack, context);
+    } else if (exec instanceof SelectionExec) {
+      return visitSelection((SelectionExec) exec, stack, context);
+    } else if (exec instanceof SortExec) {
+      return visitSort((SortExec) exec, stack, context);
+    } else if (exec instanceof SortAggregateExec) {
+      return visitSortAggregation((SortAggregateExec) exec, stack, context);
+    } else if (exec instanceof ProjectionExec) {
+      return visitProjection((ProjectionExec) exec, stack, context);
+    } else if (exec instanceof HashJoinExec) {
+      return visitHashJoin((HashJoinExec) exec, stack, context);
+    } else if (exec instanceof HashAntiJoinExec) {
+      return visitHashAntiJoin((HashAntiJoinExec) exec, stack, context);
+    } else if (exec instanceof HashSemiJoinExec) {
+      return visitHashSemiJoin((HashSemiJoinExec) exec, stack, context);
+    } else if (exec instanceof LimitExec) {
+      return visitLimit((LimitExec) exec, stack, context);
+    } else {
+      throw new PhysicalPlanningException("Unsupported Type: " + exec.getClass().getSimpleName());
+    }
+  }
+
+  private RESULT visitUnaryExecutor(UnaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    stack.push(exec);
+    RESULT r = visitChild(exec.getChild(), stack, context);
+    stack.pop();
+    return r;
+  }
+
+  private RESULT visitBinaryExecutor(BinaryPhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    stack.push(exec);
+    RESULT r = visitChild(exec.getLeftChild(), stack, context);
+    visitChild(exec.getRightChild(), stack, context);
+    stack.pop();
+    return r;
+  }
+
+  @Override
+  public RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context) {
+    return null;
+  }
+
+  @Override
+  public RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitBinaryExecutor(exec, stack, context);
+  }
+
+  @Override
+  public RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context)
+      throws PhysicalPlanningException {
+    return visitUnaryExecutor(exec, stack, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
new file mode 100644
index 0000000..131fbe5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.engine.planner.physical.*;
+
+import java.util.Stack;
+
+public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {
+  RESULT visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitSeqScan(SeqScanExec exec, Stack<PhysicalExec> stack, CONTEXT context);
+  RESULT visitSort(SortExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitSortAggregation(SortAggregateExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitMergeJoin(MergeJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitSelection(SelectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitProjection(ProjectionExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitHashJoin(HashJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitHashSemiJoin(HashSemiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitHashAntiJoin(HashAntiJoinExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+  RESULT visitLimit(LimitExec exec, Stack<PhysicalExec> stack, CONTEXT context) throws PhysicalPlanningException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
new file mode 100644
index 0000000..bd773ed
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.Stack;
+
+public class PhysicalPlanUtil {
+  public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
+      throws PhysicalPlanningException {
+    return (T) new FindVisitor().visitChild(plan, new Stack<PhysicalExec>(), clazz);
+  }
+
+  private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
+    public PhysicalExec visitChild(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
+        throws PhysicalPlanningException {
+
+      if (target.isAssignableFrom(exec.getClass())) {
+        return exec;
+      } else {
+        return super.visitChild(exec, stack, target);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
new file mode 100644
index 0000000..0d7554d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+  public PhysicalPlanningException(String message) {
+    super(message);
+  }
+
+  public PhysicalPlanningException(IOException ioe) {
+    super(ioe);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 9051219..2d736ce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -136,6 +136,6 @@ public class SeqScanExec extends PhysicalExec {
   }
 
   public String getTableName() {
-    return plan.getTableId();
+    return plan.getTableName();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 1f771e6..a33cbd7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -43,8 +43,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<List<EvalNode>>
   @Override
   public boolean isEligible(LogicalPlan plan) {
     for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      LogicalNode toBeOptimized = block.getRoot();
-      if (PlannerUtil.findTopNode(toBeOptimized, NodeType.SELECTION) != null) {
+      if (PlannerUtil.findTopNode(block.getRoot(), NodeType.SELECTION) != null) {
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 4c2f9d7..1484f32 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -49,7 +49,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
   public boolean isEligible(LogicalPlan plan) {
     LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
 
-    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) && !plan.getRootBlock().hasTableExpression()) {
+    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
       LOG.info("This query skips the logical optimization step.");
       return false;
     }
@@ -59,25 +59,29 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
 
   @Override
   public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      NodeType nodeType = block.getRootType();
-      // skip a non-table-expression block.
-      if (!(nodeType == NodeType.INSERT || nodeType == NodeType.CREATE_TABLE || nodeType == NodeType.EXPRS)) {
-        Stack<LogicalNode> stack = new Stack<LogicalNode>();
-        PushDownContext context = new PushDownContext(block);
-        context.plan = plan;
-        if (block.getProjection() != null &&
-            block.getProjection().isAllProjected()) {
-          context.targetListManager = new TargetListManager(plan,
-              block.getProjectionNode().getTargets());
-        } else {
-          context.targetListManager= new TargetListManager(plan, block.getName());
-        }
-        context.upperRequired = new HashSet<Column>(block.getSchema().getColumns());
-        visitChild(plan, block.getRoot(), stack, context);
-      }
+    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+
+    LogicalPlan.QueryBlock topmostBlock;
+
+    // skip a non-table-expression block.
+    if (plan.getRootBlock().getRootType() == NodeType.INSERT) {
+      topmostBlock = plan.getChildBlocks(rootBlock).iterator().next();
+    } else {
+      topmostBlock = rootBlock;
     }
 
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+    PushDownContext context = new PushDownContext(topmostBlock);
+    context.plan = plan;
+
+    if (topmostBlock.getProjection() != null && topmostBlock.getProjection().isAllProjected()) {
+      context.targetListManager = new TargetListManager(plan, topmostBlock.getProjectionNode().getTargets());
+    } else {
+      context.targetListManager= new TargetListManager(plan, topmostBlock.getName());
+    }
+
+    visitChild(plan, topmostBlock.getRoot(), stack, context);
+
     return plan;
   }
 
@@ -113,8 +117,24 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
   @Override
   public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack,
                                         PushDownContext context) throws PlanningException {
-    for (Target target : node.getTargets()) {
-      context.upperRequired.add(target.getColumnSchema());
+    if (context.upperRequired == null) { // all projected
+      context.upperRequired = new HashSet<Column>();
+      for (Target target : node.getTargets()) {
+        context.upperRequired.add(target.getColumnSchema());
+      }
+    } else {
+      List<Target> projectedTarget = new ArrayList<Target>();
+      for (Target target : node.getTargets()) {
+        if (context.upperRequired.contains(target.getColumnSchema())) {
+          projectedTarget.add(target);
+        }
+      }
+      node.setTargets(projectedTarget.toArray(new Target[projectedTarget.size()]));
+
+      context.upperRequired = new HashSet<Column>();
+      for (Target target : node.getTargets()) {
+        context.upperRequired.add(target.getColumnSchema());
+      }
     }
 
     stack.push(node);
@@ -126,13 +146,14 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
     // If all expressions are evaluated in the child operators and the last operator is projectable,
     // ProjectionNode will not be necessary. It eliminates ProjectionNode.
     if (context.targetListManager.isAllEvaluated() && (childNode instanceof Projectable)) {
+      child.setOutSchema(context.targetListManager.getUpdatedSchema());
       if (stack.isEmpty()) {
         // update the child node's output schemas
-        child.setOutSchema(context.targetListManager.getUpdatedSchema());
         context.queryBlock.setRoot(child);
+      } else if (stack.peek().getType() == NodeType.TABLE_SUBQUERY) {
+        ((TableSubQueryNode)stack.peek()).setSubQuery(childNode);
       } else {
         LogicalNode parent = stack.peek();
-        child.setOutSchema(context.targetListManager.getUpdatedSchema());
         PlannerUtil.deleteNode(parent, node);
       }
       return child;
@@ -242,6 +263,42 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
   }
 
   @Override
+  public LogicalNode visitTableSubQuery(LogicalPlan plan, TableSubQueryNode node, Stack<LogicalNode> stack,
+                                        PushDownContext context) throws PlanningException {
+    LogicalPlan.QueryBlock subBlock = plan.getBlock(node.getSubQuery());
+    LogicalNode subRoot = subBlock.getRoot();
+
+    Stack<LogicalNode> newStack = new Stack<LogicalNode>();
+    newStack.push(node);
+    PushDownContext newContext = new PushDownContext(subBlock);
+    if (subBlock.getProjection() != null && subBlock.getProjection().isAllProjected()
+        && context.upperRequired.size() == 0) {
+      newContext.targetListManager = new TargetListManager(plan, subBlock.getProjectionNode().getTargets());
+    } else {
+     List<Target> projectedTarget = new ArrayList<Target>();
+      for (Target target : subBlock.getTargetListManager().getUnEvaluatedTargets()) {
+        for (Column column : context.upperRequired) {
+          if (column.hasQualifier() && !node.getTableName().equals(column.getQualifier())) {
+            continue;
+          }
+          if (target.getColumnSchema().getColumnName().equalsIgnoreCase(column.getColumnName())) {
+            projectedTarget.add(target);
+          }
+        }
+      }
+      newContext.targetListManager = new TargetListManager(plan, projectedTarget.toArray(new Target[projectedTarget.size()]));
+    }
+
+    newContext.upperRequired = new HashSet<Column>();
+    newContext.upperRequired.addAll(PlannerUtil.targetToSchema(newContext.targetListManager.getTargets()).getColumns());
+
+    LogicalNode child = visitChild(plan, subRoot, newStack, newContext);
+    newStack.pop();
+    node.setInSchema(PlannerUtil.getQualifiedSchema(child.getOutSchema(), node.getTableName()));
+    return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
+  }
+
+  @Override
   public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, PushDownContext context)
       throws PlanningException {
     return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
@@ -280,7 +337,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
 
       if (!targetListManager.isEvaluated(i) && PlannerUtil.canBeEvaluated(expr, node)) {
 
-        if (node instanceof ScanNode) { // For ScanNode
+        if (node instanceof RelationNode) { // For ScanNode
 
           if (expr.getType() == EvalType.FIELD && !targetListManager.getTarget(i).hasAlias()) {
             targetListManager.setEvaluated(i);
@@ -306,7 +363,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
 
     Projectable projectable = (Projectable) node;
     if (last) {
-      Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated.");
+      Preconditions.checkState(targetListManager.isAllEvaluated(), "Not all targets are evaluated");
       projectable.setTargets(targetListManager.getTargets());
       targetListManager.getUpdatedTarget();
       node.setOutSchema(targetListManager.getUpdatedSchema());
@@ -361,18 +418,38 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
     return true;
   }
 
+  private TargetListManager buildSubBlockTargetList(LogicalPlan plan,
+      LogicalPlan.QueryBlock subQueryBlock, TableSubQueryNode subQueryNode, Set<Column> upperRequired) {
+    TargetListManager subBlockTargetList;
+    List<Target> projectedTarget = new ArrayList<Target>();
+    for (Target target : subQueryBlock.getTargetListManager().getUnEvaluatedTargets()) {
+      for (Column column : upperRequired) {
+        if (!subQueryNode.getTableName().equals(column.getQualifier())) {
+          continue;
+        }
+        if (target.getColumnSchema().getColumnName().equalsIgnoreCase(column.getColumnName())) {
+          projectedTarget.add(target);
+        }
+      }
+    }
+    subBlockTargetList = new TargetListManager(plan, projectedTarget.toArray(new Target[projectedTarget.size()]));
+    return subBlockTargetList;
+  }
+
   private BinaryNode pushDownSetNode(LogicalPlan plan, BinaryNode setNode, Stack<LogicalNode> stack,
                                             PushDownContext context) throws PlanningException {
 
-    LogicalPlan.QueryBlock leftBlock = plan.getBlock(setNode.getLeftChild());
+    LogicalPlan.QueryBlock currentBlock = plan.getBlock(setNode);
+    LogicalPlan.QueryBlock leftBlock = plan.getChildBlocks(currentBlock).get(0);
+    LogicalPlan.QueryBlock rightBlock = plan.getChildBlocks(currentBlock).get(1);
+
     PushDownContext leftContext = new PushDownContext(context, leftBlock);
-    leftContext.targetListManager = new TargetListManager(plan,
-        leftBlock.getTargetListManager().getUnEvaluatedTargets());
+    leftContext.targetListManager = buildSubBlockTargetList(plan, leftBlock,
+        (TableSubQueryNode) setNode.getLeftChild(), context.upperRequired);
 
-    LogicalPlan.QueryBlock rightBlock = plan.getBlock(setNode.getRightChild());
     PushDownContext rightContext = new PushDownContext(context, rightBlock);
-    rightContext.targetListManager = new TargetListManager(plan,
-        rightBlock.getTargetListManager().getUnEvaluatedTargets());
+    rightContext.targetListManager = buildSubBlockTargetList(plan, rightBlock,
+        (TableSubQueryNode) setNode.getRightChild(), context.upperRequired);
 
 
     stack.push(setNode);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index a0422af..0676277 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.query;
 
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoWorkerProtocol.Fetch;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
@@ -42,6 +43,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	private List<Fetch> fetches;
   private Boolean shouldDie;
   private QueryContext queryContext;
+  private DataChannel dataChannel;
 	
 	private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
 	private QueryUnitRequestProto.Builder builder = null;
@@ -55,9 +57,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	
 	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<Fragment> fragments,
 			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext) {
+			String serializedData, QueryContext queryContext, DataChannel channel) {
 		this();
-		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext);
+		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel);
 	}
 	
 	public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
@@ -68,8 +70,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 	}
 	
 	public void set(QueryUnitAttemptId id, List<Fragment> fragments,
-			String outputTable, boolean clusteredOutput, 
-			String serializedData, QueryContext queryContext) {
+			String outputTable, boolean clusteredOutput,
+			String serializedData, QueryContext queryContext, DataChannel dataChannel) {
 		this.id = id;
 		this.fragments = fragments;
 		this.outputTable = outputTable;
@@ -77,6 +79,8 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		this.serializedData = serializedData;
 		this.isUpdated = true;
     this.queryContext = queryContext;
+    this.queryContext = queryContext;
+    this.dataChannel = dataChannel;
 	}
 
 	@Override
@@ -198,6 +202,24 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     maybeInitBuilder();
     this.queryContext = queryContext;
   }
+
+  public void setDataChannel(DataChannel dataChannel) {
+    maybeInitBuilder();
+    this.dataChannel = dataChannel;
+  }
+
+  @Override
+  public DataChannel getDataChannel() {
+    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (dataChannel != null) {
+      return dataChannel;
+    }
+    if (!p.hasQueryContext()) {
+      return null;
+    }
+    this.dataChannel = new DataChannel(p.getDataChannel());
+    return this.dataChannel;
+  }
 	
 	public List<Fetch> getFetches() {
 	  initFetches();    
@@ -272,6 +294,9 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
     if (this.queryContext != null) {
       builder.setQueryContext(queryContext.getProto());
     }
+    if (this.dataChannel != null) {
+      builder.setDataChannel(dataChannel.getProto());
+    }
 	}
 
 	private void mergeLocalToProto() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
index 9ebd158..7a9c4e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetMetaDataImpl.java
@@ -179,7 +179,7 @@ public class ResultSetMetaDataImpl implements ResultSetMetaData {
    */
   @Override
   public String getTableName(int column) throws SQLException {
-    return meta.getSchema().getColumn(column - 1).getTableName();
+    return meta.getSchema().getColumn(column - 1).getQualifier();
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
index 0775c3c..a9f3706 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
@@ -21,6 +21,7 @@
  */
 package org.apache.tajo.ipc.protocolrecords;
 
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -44,4 +45,5 @@ public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUn
   public boolean shouldDie();
   public void setShouldDie();
   public QueryContext getQueryContext();
+  public DataChannel getDataChannel();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index a92ef75..d509156 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -17,10 +17,13 @@ package org.apache.tajo.master;
 import com.google.common.base.Preconditions;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.*;
 
 import java.util.*;
 
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
+
 /**
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
  * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
@@ -29,17 +32,6 @@ import java.util.*;
  * In addition, it includes a logical plan to be executed in each node.
  */
 public class ExecutionBlock {
-
-  public static enum PartitionType {
-    /** for hash partitioning */
-    HASH,
-    LIST,
-    /** for map-side join */
-    BROADCAST,
-    /** for range partitioning */
-    RANGE
-  }
-
   private ExecutionBlockId executionBlockId;
   private LogicalNode plan = null;
   private StoreTableNode store = null;
@@ -47,9 +39,12 @@ public class ExecutionBlock {
   private ExecutionBlock parent;
   private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
   private PartitionType outputType;
+
   private boolean hasJoinPlan;
   private boolean hasUnionPlan;
 
+  private Set<String> broadcasted = new HashSet<String>();
+
   public ExecutionBlock(ExecutionBlockId executionBlockId) {
     this.executionBlockId = executionBlockId;
   }
@@ -58,10 +53,6 @@ public class ExecutionBlock {
     return executionBlockId;
   }
 
-  public String getOutputName() {
-    return store.getTableName();
-  }
-
   public void setPartitionType(PartitionType partitionType) {
     this.outputType = partitionType;
   }
@@ -72,10 +63,9 @@ public class ExecutionBlock {
 
   public void setPlan(LogicalNode plan) {
     hasJoinPlan = false;
-    Preconditions.checkArgument(plan.getType() == NodeType.STORE);
-
+    hasUnionPlan = false;
+    this.scanlist.clear();
     this.plan = plan;
-    store = (StoreTableNode) plan;
 
     LogicalNode node = plan;
     ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
@@ -96,6 +86,9 @@ public class ExecutionBlock {
         s.add(s.size(), binary.getRightChild());
       } else if (node instanceof ScanNode) {
         scanlist.add((ScanNode)node);
+      } else if (node instanceof TableSubQueryNode) {
+        TableSubQueryNode subQuery = (TableSubQueryNode) node;
+        s.add(s.size(), subQuery.getSubQuery());
       }
     }
   }
@@ -105,6 +98,10 @@ public class ExecutionBlock {
     return plan;
   }
 
+  public boolean isRoot() {
+    return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
+  }
+
   public boolean hasParentBlock() {
     return parent != null;
   }
@@ -173,4 +170,24 @@ public class ExecutionBlock {
   public boolean hasUnion() {
     return hasUnionPlan;
   }
+
+  public void addBroadcastTables(Collection<String> tableNames) {
+    broadcasted.addAll(tableNames);
+  }
+
+  public void addBroadcastTable(String tableName) {
+    broadcasted.add(tableName);
+  }
+
+  public boolean isBroadcastTable(String tableName) {
+    return broadcasted.contains(tableName);
+  }
+
+  public Collection<String> getBroadcastTables() {
+    return broadcasted;
+  }
+
+  public String toString() {
+    return executionBlockId.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
index fd3ae1e..51c825c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
@@ -25,10 +25,12 @@ import java.util.Iterator;
  * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
  */
 public class ExecutionBlockCursor {
+  private MasterPlan masterPlan;
   private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
   private int cursor = 0;
 
   public ExecutionBlockCursor(MasterPlan plan) {
+    this.masterPlan = plan;
     buildOrder(plan.getRoot());
   }
 
@@ -37,26 +39,14 @@ public class ExecutionBlockCursor {
   }
 
   private void buildOrder(ExecutionBlock current) {
-    if (current.hasChildBlock()) {
-      if (current.getChildNum() == 1) {
-        ExecutionBlock block = current.getChildBlocks().iterator().next();
+    if (!masterPlan.isLeaf(current.getId())) {
+      if (masterPlan.getChildCount(current.getId()) == 1) {
+        ExecutionBlock block = masterPlan.getChild(current, 0);
         buildOrder(block);
       } else {
-        Iterator<ExecutionBlock> it = current.getChildBlocks().iterator();
-        ExecutionBlock outer = it.next();
-        ExecutionBlock inner = it.next();
-
-        // Switch between outer and inner
-        // if an inner has a child and an outer doesn't.
-        // It is for left-deep-first search.
-        if (!outer.hasChildBlock() && inner.hasChildBlock()) {
-          ExecutionBlock tmp = outer;
-          outer = inner;
-          inner = tmp;
+        for (ExecutionBlock exec : masterPlan.getChilds(current)) {
+          buildOrder(exec);
         }
-
-        buildOrder(outer);
-        buildOrder(inner);
       }
     }
     orderedBlocks.add(current);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2ddd891..5080599 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -55,7 +55,6 @@ import org.apache.tajo.storage.StorageUtil;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import static org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
@@ -107,7 +106,8 @@ public class GlobalEngine extends AbstractService {
       NoSuchQueryIdException, IllegalQueryStatusException,
       UnknownWorkerException, EmptyClusterException {
 
-    LOG.info(">>>>>SQL: " + sql);
+    LOG.info("SQL: " + sql);
+    QueryContext queryContext = new QueryContext();
 
     try {
       // setting environment variables
@@ -127,6 +127,10 @@ public class GlobalEngine extends AbstractService {
       final boolean hiveQueryMode = context.getConf().getBoolVar(TajoConf.ConfVars.HIVE_QUERY_MODE);
       LOG.info("hive.query.mode:" + hiveQueryMode);
 
+      if (hiveQueryMode) {
+        queryContext.setHiveQueryMode();
+      }
+
       Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
       
       LogicalPlan plan = createLogicalPlan(planningContext);
@@ -139,7 +143,6 @@ public class GlobalEngine extends AbstractService {
         responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
         responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
       } else {
-        QueryContext queryContext = new QueryContext();
         hookManager.doHooks(queryContext, plan);
 
         QueryJobManager queryJobManager = this.context.getQueryJobManager();
@@ -300,7 +303,7 @@ public class GlobalEngine extends AbstractService {
     void hook(QueryContext queryContext, LogicalPlan plan) throws Exception;
   }
 
-  public class DistributedQueryHookManager {
+  public static class DistributedQueryHookManager {
     private List<DistributedQueryHook> hooks = new ArrayList<DistributedQueryHook>();
     public void addHook(DistributedQueryHook hook) {
       hooks.add(hook);
@@ -319,7 +322,7 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  private class CreateTableHook implements DistributedQueryHook {
+  public class CreateTableHook implements DistributedQueryHook {
 
     @Override
     public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
@@ -341,7 +344,7 @@ public class GlobalEngine extends AbstractService {
     }
   }
 
-  private class InsertHook implements DistributedQueryHook {
+  public static class InsertHook implements DistributedQueryHook {
 
     @Override
     public boolean isEligible(QueryContext queryContext, LogicalPlan plan) {
@@ -416,10 +419,8 @@ public class GlobalEngine extends AbstractService {
         ProjectionNode projectionNode = new ProjectionNode(targets);
         projectionNode.setInSchema(insertNode.getSubQuery().getOutSchema());
         projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
-        Collection<QueryBlockGraph.BlockEdge> edges = plan.getConnectedBlocks(LogicalPlan.ROOT_BLOCK);
-        LogicalPlan.QueryBlock block = plan.getBlock(edges.iterator().next().getTargetBlock());
-        projectionNode.setChild(block.getRoot());
-
+        List<LogicalPlan.QueryBlock> blocks = plan.getChildBlocks(plan.getRootBlock());
+        projectionNode.setChild(blocks.get(0).getRoot());
 
         storeNode.setOutSchema(projectionNode.getOutSchema());
         storeNode.setInSchema(projectionNode.getOutSchema());
@@ -427,12 +428,10 @@ public class GlobalEngine extends AbstractService {
       } else {
         storeNode.setOutSchema(subQueryOutSchema);
         storeNode.setInSchema(subQueryOutSchema);
-        Collection<QueryBlockGraph.BlockEdge> edges = plan.getConnectedBlocks(LogicalPlan.ROOT_BLOCK);
-        LogicalPlan.QueryBlock block = plan.getBlock(edges.iterator().next().getTargetBlock());
-        storeNode.setChild(block.getRoot());
+        List<LogicalPlan.QueryBlock> childBlocks = plan.getChildBlocks(plan.getRootBlock());
+        storeNode.setChild(childBlocks.get(0).getRoot());
       }
 
-      storeNode.setListPartition();
       if (insertNode.hasStorageType()) {
         storeNode.setStorageType(insertNode.getStorageType());
       }