You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/16 13:33:31 UTC

[4/7] TAJO-184: Refactor GlobalPlanner and global plan data structure. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 7c374d2..d5e51ef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -18,681 +18,432 @@
 
 package org.apache.tajo.master;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.FromTable;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
-import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
 
 public class GlobalPlanner {
   private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
 
   private TajoConf conf;
   private AbstractStorageManager sm;
-  private QueryId queryId;
 
-  public GlobalPlanner(final TajoConf conf,
-                       final AbstractStorageManager sm,
-                       final EventHandler eventHandler)
+  public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
       throws IOException {
     this.conf = conf;
     this.sm = sm;
   }
 
+  public class GlobalPlanContext {
+    MasterPlan plan;
+    Set<String> broadcastTables = new HashSet<String>();
+    LogicalNode topmost;
+    LogicalNode lastRepartionableNode;
+    ExecutionBlock topMostLeftExecBlock;
+    ExecutionBlock topMostRightExecBlock;
+  }
+
   /**
    * Builds a master plan from the given logical plan.
-   * @param queryId
-   * @param rootNode
-   * @return
-   * @throws IOException
    */
-  public MasterPlan build(QueryId queryId, LogicalRootNode rootNode)
-      throws IOException {
-    this.queryId = queryId;
-
-    String outputTableName = null;
-    if (rootNode.getChild().getType() == NodeType.STORE) {
-      // create table queries are executed by the master
-      StoreTableNode storeTableNode = (StoreTableNode) rootNode.getChild();
-      outputTableName = storeTableNode.getTableName();
+  public void build(MasterPlan masterPlan)
+      throws IOException, PlanningException {
+
+    NewPlanner planner = new NewPlanner();
+    GlobalPlanContext globalPlanContext = new GlobalPlanContext();
+    globalPlanContext.plan = masterPlan;
+    LOG.info(masterPlan.getLogicalPlan());
+
+    LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
+    planner.visitChild(masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>(), globalPlanContext);
+
+    ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
+
+    if (globalPlanContext.lastRepartionableNode != null
+        && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
+      UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
+      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+      UnionsFinderContext finderContext = new UnionsFinderContext();
+      finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+      for (UnionNode union : finderContext.unionList) {
+        TableSubQueryNode leftSubQuery = union.getLeftChild();
+        TableSubQueryNode rightSubQuery = union.getRightChild();
+        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          execBlock.setPlan(leftSubQuery);
+          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+          masterPlan.addConnect(dataChannel);
+        }
+        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+          execBlock.setPlan(rightSubQuery);
+          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
+          masterPlan.addConnect(dataChannel);
+        }
+      }
+    } else {
+      DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
+      dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
+      masterPlan.addConnect(dataChannel);
     }
+    masterPlan.setTerminal(terminalBlock);
+    LOG.info(masterPlan);
+  }
 
-    // insert store at the subnode of the root
-    UnaryNode root = rootNode;
-    if (root.getChild().getType() != NodeType.STORE) {
-      ExecutionBlockId executionBlockId = QueryIdFactory.newExecutionBlockId(this.queryId);
-      outputTableName = executionBlockId.toString();
-      insertStore(executionBlockId.toString(),root).setLocal(false);
+  public static ScanNode buildInputExecutor(DataChannel channel) {
+    Preconditions.checkArgument(channel.getSchema() != null,
+        "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
+    TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
+    TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
+    return new ScanNode(new FromTable(desc));
+  }
+
+  public class NewPlanner extends BasicLogicalPlanVisitor<GlobalPlanContext> {
+
+    @Override
+    public LogicalNode visitRoot(LogicalPlan plan, LogicalRootNode node, Stack<LogicalNode> stack,
+                                 GlobalPlanContext data) throws PlanningException {
+      super.visitRoot(plan, node, stack, data);
+
+      if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() != NodeType.UNION) {
+        data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
+      } else if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() == NodeType.UNION) {
+
+      } else {
+        ExecutionBlock execBlock = data.plan.newExecutionBlock();
+        execBlock.setPlan(node);
+        data.topMostLeftExecBlock = execBlock;
+      }
+
+      data.topmost = node;
+      return node;
     }
-    
-    // convert 2-phase plan
-    LogicalNode twoPhased = convertTo2Phase(rootNode);
 
-    // make query graph
-    MasterPlan globalPlan = convertToGlobalPlan(twoPhased);
-    globalPlan.setOutputTableName(outputTableName);
+    @Override
+    public LogicalNode visitProjection(LogicalPlan plan, ProjectionNode node, Stack<LogicalNode> stack,
+                                       GlobalPlanContext data) throws PlanningException {
+      super.visitProjection(plan, node, stack, data);
+      data.topmost = node;
+      return node;
+    }
 
-    return globalPlan;
-  }
-  
-  private StoreTableNode insertStore(String tableId, LogicalNode parent) {
-    StoreTableNode store = new StoreTableNode(tableId);
-    store.setLocal(true);
-    PlannerUtil.insertNode(parent, store);
-    return store;
-  }
-  
-  /**
-   * Transforms a logical plan to a two-phase plan. 
-   * Store nodes are inserted for every logical nodes except store and scan nodes
-   */
-  private class GlobalPlanBuilder implements LogicalNodeVisitor {
     @Override
-    public void visit(LogicalNode node) {
-      String tableId;
-      StoreTableNode store;
-      if (node.getType() == NodeType.GROUP_BY) {
-        // transform group by to two-phase plan 
-        GroupbyNode groupby = (GroupbyNode) node;
-        // insert a store for the child of first group by
-        if (groupby.getChild().getType() != NodeType.UNION &&
-            groupby.getChild().getType() != NodeType.STORE &&
-            groupby.getChild().getType() != NodeType.SCAN) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          insertStore(tableId, groupby);
-        }
-        tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-        // insert (a store for the first group by) and (a second group by)
-        PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
-      } else if (node.getType() == NodeType.SORT) {
-        // transform sort to two-phase plan 
-        SortNode sort = (SortNode) node;
-        // insert a store for the child of first sort
-        if (sort.getChild().getType() != NodeType.UNION &&
-            sort.getChild().getType() != NodeType.STORE &&
-            sort.getChild().getType() != NodeType.SCAN) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          insertStore(tableId, sort);
-        }
-        tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-        // insert (a store for the first sort) and (a second sort)
-        PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
-      } else if (node.getType() == NodeType.JOIN) {
-        // transform join to two-phase plan 
-        // the first phase of two-phase join can be any logical nodes
-        JoinNode join = (JoinNode) node;
-
-        if (join.getRightChild().getType() == NodeType.SCAN &&
-            join.getLeftChild().getType() == NodeType.SCAN) {
-          ScanNode outerScan = (ScanNode) join.getRightChild();
-          ScanNode innerScan = (ScanNode) join.getLeftChild();
-
-
-          TableMeta outerMeta = outerScan.getFromTable().getTableDesc().getMeta();
-          TableMeta innerMeta = innerScan.getFromTable().getTableDesc().getMeta();
-          long threshold = conf.getLongVar(ConfVars.BROADCAST_JOIN_THRESHOLD);
-
-
-          // if the broadcast join is available
-          boolean outerSmall = false;
-          boolean innerSmall = false;
-          if (!outerScan.isLocal() && outerMeta.getStat() != null &&
-              outerMeta.getStat().getNumBytes() <= threshold) {
-            outerSmall = true;
-            LOG.info("The relation (" + outerScan.getTableId() +
-                ") is less than " + threshold);
+    public LogicalNode visitLimit(LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+      super.visitLimit(plan, node, stack, data);
+      data.topmost = node;
+      return node;
+    }
+
+    private ExecutionBlock addChannel(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
+                                      LogicalNode childNode, ExecutionBlock lastChildBlock) throws PlanningException {
+      ExecutionBlock currentBlock = null;
+      ExecutionBlock childBlock;
+
+      childBlock = lastChildBlock;
+
+      NodeType shuffleRequiredNodeType = lastDistNode.getType();
+      if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
+        GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
+
+        GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+
+        if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+            ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+          UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+          ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+          UnionsFinderContext finderContext = new UnionsFinderContext();
+          finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+          currentBlock = masterPlan.newExecutionBlock();
+          GroupbyNode secondGroupBy = groupByNode;
+          for (UnionNode union : finderContext.unionList) {
+            TableSubQueryNode leftSubQuery = union.getLeftChild();
+            TableSubQueryNode rightSubQuery = union.getRightChild();
+            DataChannel dataChannel;
+            if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+              ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+              GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+              g1.setChild(leftSubQuery);
+              execBlock.setPlan(g1);
+              dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+              ScanNode scanNode = buildInputExecutor(dataChannel);
+              secondGroupBy.setChild(scanNode);
+              masterPlan.addConnect(dataChannel);
+            }
+            if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+              ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+              GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+              g1.setChild(rightSubQuery);
+              execBlock.setPlan(g1);
+              dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+              ScanNode scanNode = buildInputExecutor(dataChannel);
+              secondGroupBy.setChild(scanNode);
+              masterPlan.addConnect(dataChannel);
+            }
           }
-          if (!innerScan.isLocal() && innerMeta.getStat() != null &&
-              innerMeta.getStat().getNumBytes() <= threshold) {
-            innerSmall = true;
-            LOG.info("The relation (" + innerScan.getTableId() +
-                ") is less than " + threshold);
+          LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+          if (parent instanceof UnaryNode && parent != secondGroupBy) {
+            ((UnaryNode)parent).setChild(secondGroupBy);
           }
+          currentBlock.setPlan(curNode);
+        } else {
 
-          if (outerSmall && innerSmall) {
-            if (outerMeta.getStat().getNumBytes() <=
-                innerMeta.getStat().getNumBytes()) {
-              outerScan.setBroadcast();
-              LOG.info("The relation " + outerScan.getTableId()
-                  + " is broadcasted");
-            } else {
-              innerScan.setBroadcast();
-              LOG.info("The relation " + innerScan.getTableId()
-                  + " is broadcasted");
-            }
+          if (childBlock == null) { // first repartition node
+            childBlock = masterPlan.newExecutionBlock();
+          }
+          childBlock.setPlan(firstGroupBy);
+
+          currentBlock = masterPlan.newExecutionBlock();
+
+          DataChannel channel;
+          if (firstGroupBy.isEmptyGrouping()) {
+            channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+            channel.setPartitionKey(firstGroupBy.getGroupingColumns());
           } else {
-            if (outerSmall) {
-              outerScan.setBroadcast();
-              LOG.info("The relation (" + outerScan.getTableId()
-                  + ") is broadcasted");
-            } else if (innerSmall) {
-              innerScan.setBroadcast();
-              LOG.info("The relation (" + innerScan.getTableId()
-                  + ") is broadcasted");
-            }
+            channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+            channel.setPartitionKey(firstGroupBy.getGroupingColumns());
           }
+          channel.setSchema(firstGroupBy.getOutSchema());
 
-          if (outerScan.isBroadcast() || innerScan.isBroadcast()) {
-            return;
+          GroupbyNode secondGroupBy = groupByNode;
+          ScanNode scanNode = buildInputExecutor(channel);
+          secondGroupBy.setChild(scanNode);
+
+          LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+          if (parent instanceof UnaryNode && parent != secondGroupBy) {
+            ((UnaryNode)parent).setChild(secondGroupBy);
           }
-        }
 
-        // insert stores for the first phase
-        if (join.getLeftChild().getType() != NodeType.UNION &&
-            join.getLeftChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          store.setLocal(true);
-          PlannerUtil.insertOuterNode(node, store);
+          masterPlan.addConnect(channel);
+          currentBlock.setPlan(curNode);
         }
-        if (join.getRightChild().getType() != NodeType.UNION &&
-            join.getRightChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          store.setLocal(true);
-          PlannerUtil.insertInnerNode(node, store);
+      } else if (shuffleRequiredNodeType == NodeType.SORT) {
+        SortNode firstSort = (SortNode) lastDistNode;
+        if (childBlock == null) {
+          childBlock = masterPlan.newExecutionBlock();
         }
-      } else if (node.getType() == NodeType.UNION) {
-        // not two-phase transform
-        UnionNode union = (UnionNode) node;
-        // insert stores
-        if (union.getLeftChild().getType() != NodeType.UNION &&
-            union.getLeftChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          if(union.getLeftChild().getType() == NodeType.GROUP_BY) {
-            /*This case is for cube by operator
-             * TODO : more complicated conidtion*/
-            store.setLocal(true);
+        childBlock.setPlan(firstSort);
+
+        currentBlock = masterPlan.newExecutionBlock();
+        DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
+        channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
+        channel.setSchema(childNode.getOutSchema());
+
+        SortNode secondSort = PlannerUtil.clone(lastDistNode);
+        ScanNode secondScan = buildInputExecutor(channel);
+        secondSort.setChild(secondScan);
+
+        LimitNode limitAndSort;
+        LimitNode limitOrNull = PlannerUtil.findTopNode(curNode, NodeType.LIMIT);
+        if (limitOrNull != null) {
+          limitAndSort = PlannerUtil.clone(limitOrNull);
+          limitAndSort.setChild(firstSort);
+
+          if (childBlock.getPlan().getType() == NodeType.SORT) {
+            childBlock.setPlan(limitAndSort);
           } else {
-            /* This case is for union query*/
-            store.setLocal(false);
+            LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
+            if (sortParent != null) {
+              if (sortParent instanceof UnaryNode) {
+                ((UnaryNode)sortParent).setChild(limitAndSort);
+              }
+            }
           }
-          PlannerUtil.insertOuterNode(node, store);
         }
-        if (union.getRightChild().getType() != NodeType.UNION &&
-            union.getRightChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          store = new StoreTableNode(tableId);
-          if(union.getRightChild().getType() == NodeType.GROUP_BY) {
-            /*This case is for cube by operator
-             * TODO : more complicated conidtion*/
-            store.setLocal(true);
-          }else {
-            /* This case is for union query*/
-            store.setLocal(false);
-          }
-          PlannerUtil.insertInnerNode(node, store);
+
+        LogicalNode parent = PlannerUtil.findTopParentNode(curNode, lastDistNode.getType());
+        if (parent instanceof UnaryNode && parent != secondSort) {
+          ((UnaryNode)parent).setChild(secondSort);
         }
-      } else if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode)node;
-        if (unary.getType() != NodeType.STORE &&
-            unary.getChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
-          insertStore(tableId, unary);
+
+        masterPlan.addConnect(channel);
+        currentBlock.setPlan(curNode);
+      } else if (shuffleRequiredNodeType == NodeType.JOIN) {
+        JoinNode joinNode = (JoinNode) lastDistNode;
+        LogicalNode leftNode = joinNode.getLeftChild();
+        LogicalNode rightNode = joinNode.getRightChild();
+
+        ExecutionBlock leftBlock = null;
+        if (lastChildBlock == null) {
+          leftBlock = masterPlan.newExecutionBlock();
+          leftBlock.setPlan(leftNode);
+        } else {
+          leftBlock = lastChildBlock;
         }
+        ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
+        rightBlock.setPlan(rightNode);
+
+        currentBlock = masterPlan.newExecutionBlock();
+
+        DataChannel leftChannel = new DataChannel(leftBlock, currentBlock, HASH_PARTITION, 32);
+        DataChannel rightChannel = new DataChannel(rightBlock, currentBlock, HASH_PARTITION, 32);
+
+        ScanNode leftScan = buildInputExecutor(leftChannel);
+        ScanNode rightScan = buildInputExecutor(rightChannel);
+
+        joinNode.setLeftChild(leftScan);
+        joinNode.setRightChild(rightScan);
+        currentBlock.setPlan(joinNode);
+
+        masterPlan.addConnect(leftChannel);
+        masterPlan.addConnect(rightChannel);
       }
+
+      return currentBlock;
     }
-  }
 
-  /**
-   * Convert the logical plan to a two-phase plan by the post-order traverse.
-   * 
-   * @param logicalPlan
-   * @return
-   */
-  private LogicalNode convertTo2Phase(LogicalNode logicalPlan) {
-    LogicalRootNode root = (LogicalRootNode) logicalPlan;
-    root.postOrder(new GlobalPlanBuilder());
-    return logicalPlan;
-  }
-  
-  private Map<StoreTableNode, ExecutionBlock> convertMap =
-      new HashMap<StoreTableNode, ExecutionBlock>();
-  
-  /**
-   * Logical plan을 후위 탐색하면서 SubQuery 생성
-   * 
-   * @param node 현재 방문 중인 노드
-   * @throws IOException
-   */
-  private void recursiveBuildSubQuery(LogicalNode node)
-      throws IOException {
-    ExecutionBlock subQuery;
-    StoreTableNode store;
-    if (node instanceof UnaryNode) {
-      recursiveBuildSubQuery(((UnaryNode) node).getChild());
-      
-      if (node.getType() == NodeType.STORE) {
-        store = (StoreTableNode) node;
-        ExecutionBlockId id;
-        if (store.getTableName().startsWith(ExecutionBlockId.EB_ID_PREFIX)) {
-          id = TajoIdUtils.createExecutionBlockId(store.getTableName());
-        } else {
-          id = QueryIdFactory.newExecutionBlockId(queryId);
-        }
-        subQuery = new ExecutionBlock(id);
-
-        switch (store.getChild().getType()) {
-        case BST_INDEX_SCAN:
-        case SCAN:  // store - scan
-          subQuery = makeScanSubQuery(subQuery);
-          subQuery.setPlan(node);
-          break;
-        case SELECTION:
-        case PROJECTION:
-        case LIMIT:
-          subQuery = makeUnarySubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case GROUP_BY:
-          subQuery = makeGroupbySubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case SORT:
-          subQuery = makeSortSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case JOIN:  // store - join
-          subQuery = makeJoinSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        case UNION:
-          subQuery = makeUnionSubQuery(store, node, subQuery);
-          subQuery.setPlan(node);
-          break;
-        default:
-          subQuery = null;
-          break;
-        }
+    @Override
+    public LogicalNode visitSort(LogicalPlan plan, SortNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+        throws PlanningException {
 
-        convertMap.put(store, subQuery);
+      super.visitSort(plan, node, stack, data);
+
+      if (data.lastRepartionableNode != null) {
+        data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
       }
-    } else if (node instanceof BinaryNode) {
-      recursiveBuildSubQuery(((BinaryNode) node).getLeftChild());
-      recursiveBuildSubQuery(((BinaryNode) node).getRightChild());
-    } else if (node instanceof ScanNode) {
 
-    } else {
+      data.topmost = node;
+      data.lastRepartionableNode = node;
 
-    }
-  }
-  
-  private ExecutionBlock makeScanSubQuery(ExecutionBlock block) {
-    block.setPartitionType(PartitionType.LIST);
-    return block;
-  }
-  
-  /**
-   * Unifiable node(selection, projection)을 자식 플랜과 같은 SubQuery로 생성
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeUnarySubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
-    ScanNode newScan;
-    ExecutionBlock prev;
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode child = (UnaryNode) unary.getChild();
-    StoreTableNode prevStore = (StoreTableNode)child.getChild();
-
-    // add scan
-    newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-        prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
-    newScan.setLocal(true);
-    child.setChild(newScan);
-    prev = convertMap.get(prevStore);
-
-    if (prev != null) {
-      prev.setParentBlock(unit);
-      unit.addChildBlock(newScan, prev);
-      prev.setPartitionType(PartitionType.LIST);
+      return node;
     }
 
-    unit.setPartitionType(PartitionType.LIST);
+    @Override
+    public LogicalNode visitGroupBy(LogicalPlan plan, GroupbyNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+      super.visitGroupBy(plan, node, stack, data);
 
-    return unit;
-  }
-  
-  /**
-   * Two-phase SubQuery 생성.
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeGroupbySubQuery(StoreTableNode rootStore,
-                                       LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode unaryChild;
-    StoreTableNode prevStore;
-    ScanNode newScan;
-    ExecutionBlock prev;
-    unaryChild = (UnaryNode) unary.getChild();  // groupby
-    NodeType curType = unaryChild.getType();
-    if (unaryChild.getChild().getType() == NodeType.STORE) {
-      // store - groupby - store
-      unaryChild = (UnaryNode) unaryChild.getChild(); // store
-      prevStore = (StoreTableNode) unaryChild;
-      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-          prevStore.getTableName(),
-          sm.getTablePath(prevStore.getTableName()));
-      newScan.setLocal(true);
-      ((UnaryNode) unary.getChild()).setChild(newScan);
-      prev = convertMap.get(prevStore);
-      if (prev != null) {
-        prev.setParentBlock(unit);
-        unit.addChildBlock(newScan, prev);
+      if (data.lastRepartionableNode != null) {
+        data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
       }
 
-      if (unaryChild.getChild().getType() == curType) {
-        // the second phase
-        unit.setPartitionType(PartitionType.LIST);
-        if (prev != null) {
-          prev.setPartitionType(PartitionType.HASH);
-        }
-      } else {
-        // the first phase
-        unit.setPartitionType(PartitionType.HASH);
-        if (prev != null) {
-          prev.setPartitionType(PartitionType.LIST);
-        }
-      }
-    } else if (unaryChild.getChild().getType() == NodeType.SCAN) {
-      // the first phase
-      // store - groupby - scan
-      unit.setPartitionType(PartitionType.HASH);
-    } else if (unaryChild.getChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)unaryChild.getChild(), unit,
-          null, PartitionType.LIST);
-    } else {
-      // error
+      data.topmost = node;
+      data.lastRepartionableNode = node;
+      return node;
     }
-    return unit;
-  }
-  
-  /**
-   *
-   *
-   * @param rootStore 생성할 SubQuery의 store
-   * @param plan logical plan
-   * @param unit 생성할 SubQuery
-   * @return
-   * @throws IOException
-   */
-  private ExecutionBlock makeUnionSubQuery(StoreTableNode rootStore,
-                                     LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode) plan;
-    StoreTableNode outerStore, innerStore;
-    ExecutionBlock prev;
-    UnionNode union = (UnionNode) unary.getChild();
-    unit.setPartitionType(PartitionType.LIST);
-    
-    if (union.getLeftChild().getType() == NodeType.STORE) {
-      outerStore = (StoreTableNode) union.getLeftChild();
-      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
-          StoreType.CSV);
-      insertOuterScan(union, outerStore.getTableName(), outerMeta);
-      prev = convertMap.get(outerStore);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(PartitionType.LIST);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) union.getLeftChild(), prev);
-      }
-    } else if (union.getLeftChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+
+    @Override
+    public LogicalNode visitFilter(LogicalPlan plan, SelectionNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+      super.visitFilter(plan, node, stack, data);
+      data.topmost = node;
+      return node;
     }
-    
-    if (union.getRightChild().getType() == NodeType.STORE) {
-      innerStore = (StoreTableNode) union.getRightChild();
-      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
-          StoreType.CSV);
-      insertInnerScan(union, innerStore.getTableName(), innerMeta);
-      prev = convertMap.get(innerStore);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(PartitionType.LIST);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) union.getRightChild(), prev);
+
+    @Override
+    public LogicalNode visitJoin(LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+      super.visitJoin(plan, node, stack, data);
+
+      if (data.lastRepartionableNode != null) {
+        data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost, data.topMostLeftExecBlock);
       }
-    } else if (union.getRightChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, union, unit, null, PartitionType.LIST);
+
+      data.topmost = node;
+      data.lastRepartionableNode = node;
+
+      return node;
     }
 
-    return unit;
-  }
+    @Override
+    public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, GlobalPlanContext data) throws PlanningException {
+      super.visitUnion(plan, node, stack, data);
 
-  private ExecutionBlock makeSortSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
-
-    UnaryNode unary = (UnaryNode) plan;
-    UnaryNode unaryChild;
-    StoreTableNode prevStore;
-    ScanNode newScan;
-    ExecutionBlock prev;
-    unaryChild = (UnaryNode) unary.getChild();  // groupby
-    NodeType curType = unaryChild.getType();
-    if (unaryChild.getChild().getType() == NodeType.STORE) {
-      // store - groupby - store
-      unaryChild = (UnaryNode) unaryChild.getChild(); // store
-      prevStore = (StoreTableNode) unaryChild;
-      newScan = GlobalPlannerUtils.newScanPlan(prevStore.getOutSchema(),
-          prevStore.getTableName(), sm.getTablePath(prevStore.getTableName()));
-      newScan.setLocal(true);
-      ((UnaryNode) unary.getChild()).setChild(newScan);
-      prev = convertMap.get(prevStore);
-      if (prev != null) {
-        prev.setParentBlock(unit);
-        unit.addChildBlock(newScan, prev);
-        if (unaryChild.getChild().getType() == curType) {
-          // TODO - this is duplicated code
-          prev.setPartitionType(PartitionType.RANGE);
-        } else {
-          prev.setPartitionType(PartitionType.LIST);
-        }
+      if (data.lastRepartionableNode != null && data.lastRepartionableNode.getType() != NodeType.UNION) {
+        data.topMostLeftExecBlock = addChannel(data.plan, data.lastRepartionableNode, node, data.topmost,
+            data.topMostLeftExecBlock);
       }
-      if (unaryChild.getChild().getType() == curType) {
-        // the second phase
-        unit.setPartitionType(PartitionType.LIST);
-      } else {
-        // the first phase
-        unit.setPartitionType(PartitionType.HASH);
-      }
-    } else if (unaryChild.getChild().getType() == NodeType.SCAN) {
-      // the first phase
-      // store - sort - scan
-      unit.setPartitionType(PartitionType.RANGE);
-    } else if (unaryChild.getChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)unaryChild.getChild(), unit,
-          null, PartitionType.LIST);
-    } else {
-      // error
+
+      data.topmost = node;
+      data.lastRepartionableNode = node;
+      return node;
     }
-    return unit;
-  }
-  
-  private ExecutionBlock makeJoinSubQuery(StoreTableNode rootStore,
-                                    LogicalNode plan, ExecutionBlock unit) throws IOException {
-    UnaryNode unary = (UnaryNode)plan;
-    StoreTableNode outerStore, innerStore;
-    ExecutionBlock prev;
-    JoinNode join = (JoinNode) unary.getChild();
-    Schema outerSchema = join.getLeftChild().getOutSchema();
-    Schema innerSchema = join.getRightChild().getOutSchema();
-    unit.setPartitionType(PartitionType.LIST);
-
-    List<Column> outerCollist = new ArrayList<Column>();
-    List<Column> innerCollist = new ArrayList<Column>();
-    
-    // TODO: set partition for store nodes
-    if (join.hasJoinQual()) {
-      // getting repartition keys
-      List<Column[]> cols = PlannerUtil.getJoinKeyPairs(join.getJoinQual(), outerSchema, innerSchema);
-      for (Column [] pair : cols) {
-        outerCollist.add(pair[0]);
-        innerCollist.add(pair[1]);
-      }
-    } else {
-      // broadcast
+
+    @Override
+    public LogicalNode visitExcept(LogicalPlan plan, ExceptNode node, Stack<LogicalNode> stack,
+                                   GlobalPlanContext data) throws PlanningException {
+      super.visitExcept(plan, node, stack, data);
+      data.topmost = node;
+      return node;
     }
-    
-    Column[] outerCols = new Column[outerCollist.size()];
-    Column[] innerCols = new Column[innerCollist.size()];
-    outerCols = outerCollist.toArray(outerCols);
-    innerCols = innerCollist.toArray(innerCols);
-    
-    // outer
-    if (join.getLeftChild().getType() == NodeType.STORE) {
-      outerStore = (StoreTableNode) join.getLeftChild();
-      TableMeta outerMeta = CatalogUtil.newTableMeta(outerStore.getOutSchema(),
-          StoreType.CSV);
-      insertOuterScan(join, outerStore.getTableName(), outerMeta);
-      prev = convertMap.get(outerStore);
-      if (prev != null) {
-        prev.setPartitionType(PartitionType.HASH);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) join.getLeftChild(), prev);
-      }
-      outerStore.setPartitions(PartitionType.HASH, outerCols, 32);
-    } else if (join.getLeftChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)join.getLeftChild(), unit,
-          outerCols, PartitionType.HASH);
-    } else {
 
+    @Override
+    public LogicalNode visitIntersect(LogicalPlan plan, IntersectNode node, Stack<LogicalNode> stack,
+                                      GlobalPlanContext data) throws PlanningException {
+      super.visitIntersect(plan, node, stack, data);
+      data.topmost = node;
+      return node;
     }
-    
-    // inner
-    if (join.getRightChild().getType() == NodeType.STORE) {
-      innerStore = (StoreTableNode) join.getRightChild();
-      TableMeta innerMeta = CatalogUtil.newTableMeta(innerStore.getOutSchema(),
-          StoreType.CSV);
-      insertInnerScan(join, innerStore.getTableName(), innerMeta);
-      prev = convertMap.get(innerStore);
-      if (prev != null) {
-        prev.setPartitionType(PartitionType.HASH);
-        prev.setParentBlock(unit);
-        unit.addChildBlock((ScanNode) join.getRightChild(), prev);
-      }
-      innerStore.setPartitions(PartitionType.HASH, innerCols, 32);
-    } else if (join.getRightChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)join.getRightChild(), unit,
-          innerCols, PartitionType.HASH);
+
+    @Override
+    public LogicalNode visitTableSubQuery(LogicalPlan plan, TableSubQueryNode node, Stack<LogicalNode> stack,
+                                          GlobalPlanContext data) throws PlanningException {
+      super.visitTableSubQuery(plan, node, stack, data);
+      data.topmost = node;
+      return node;
     }
-    
-    return unit;
-  }
-  
-  /**
-   * Recursive하게 union의 자식 plan들을 설정
-   * 
-   * @param rootStore 생성할 SubQuery의 store
-   * @param union union을 root로 하는 logical plan
-   * @param cur 생성할 SubQuery
-   * @param cols partition 정보를 설정하기 위한 column array
-   * @param prevOutputType 자식 SubQuery의 partition type
-   * @throws IOException
-   */
-  private void _handleUnionNode(StoreTableNode rootStore, UnionNode union, 
-      ExecutionBlock cur, Column[] cols, PartitionType prevOutputType)
-          throws IOException {
-    StoreTableNode store;
-    TableMeta meta;
-    ExecutionBlock prev;
-    
-    if (union.getLeftChild().getType() == NodeType.STORE) {
-      store = (StoreTableNode) union.getLeftChild();
-      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
-      insertOuterScan(union, store.getTableName(), meta);
-      prev = convertMap.get(store);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(prevOutputType);
-        prev.setParentBlock(cur);
-        cur.addChildBlock((ScanNode) union.getLeftChild(), prev);
-      }
-      if (cols != null) {
-        store.setPartitions(PartitionType.LIST, cols, 32);
-      }
-    } else if (union.getLeftChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)union.getLeftChild(), cur, cols,
-          prevOutputType);
+
+    @Override
+    public LogicalNode visitScan(LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+        throws PlanningException {
+      data.topmost = node;
+      return node;
     }
-    
-    if (union.getRightChild().getType() == NodeType.STORE) {
-      store = (StoreTableNode) union.getRightChild();
-      meta = CatalogUtil.newTableMeta(store.getOutSchema(), StoreType.CSV);
-      insertInnerScan(union, store.getTableName(), meta);
-      prev = convertMap.get(store);
-      if (prev != null) {
-        prev.getStoreTableNode().setTableName(rootStore.getTableName());
-        prev.setPartitionType(prevOutputType);
-        prev.setParentBlock(cur);
-        cur.addChildBlock((ScanNode) union.getRightChild(), prev);
-      }
-      if (cols != null) {
-        store.setPartitions(PartitionType.LIST, cols, 32);
-      }
-    } else if (union.getRightChild().getType() == NodeType.UNION) {
-      _handleUnionNode(rootStore, (UnionNode)union.getRightChild(), cur, cols,
-          prevOutputType);
+
+    @Override
+    public LogicalNode visitStoreTable(LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack,
+                                       GlobalPlanContext data) throws PlanningException {
+      super.visitStoreTable(plan, node, stack, data);
+      data.topmost = node;
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitInsert(LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack, GlobalPlanContext data)
+        throws PlanningException {
+      super.visitInsert(plan, node, stack, data);
+      data.topmost = node;
+      return node;
     }
   }
-  
-  private LogicalNode insertOuterScan(BinaryNode parent, String tableId,
-      TableMeta meta) throws IOException {
-    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
-    ScanNode scan = new ScanNode(new FromTable(desc));
-    scan.setLocal(true);
-    scan.setInSchema(meta.getSchema());
-    scan.setOutSchema(meta.getSchema());
-    parent.setLeftChild(scan);
-    return parent;
-  }
-  
-  private LogicalNode insertInnerScan(BinaryNode parent, String tableId, 
-      TableMeta meta) throws IOException {
-    TableDesc desc = CatalogUtil.newTableDesc(tableId, meta, sm.getTablePath(tableId));
-    ScanNode scan = new ScanNode(new FromTable(desc));
-    scan.setLocal(true);
-    scan.setInSchema(meta.getSchema());
-    scan.setOutSchema(meta.getSchema());
-    parent.setRightChild(scan);
-    return parent;
+
+  private class UnionsFinderContext {
+    List<UnionNode> unionList = new ArrayList<UnionNode>();
   }
-  
-  private MasterPlan convertToGlobalPlan(LogicalNode logicalPlan) throws IOException {
-    recursiveBuildSubQuery(logicalPlan);
-    ExecutionBlock root;
 
-    root = convertMap.get(((LogicalRootNode)logicalPlan).getChild());
-    root.getStoreTableNode().setLocal(false);
+  private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext> {
+    public LogicalNode visitUnion(LogicalPlan plan, UnionNode node, Stack<LogicalNode> stack, UnionsFinderContext data)
+        throws PlanningException {
+      if (node.getType() == NodeType.UNION) {
+        data.unionList.add(node);
+      }
+
+      stack.push(node);
+      TableSubQueryNode leftSubQuery = node.getLeftChild();
+      TableSubQueryNode rightSubQuery = node.getRightChild();
+      if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
+        visitChild(plan, leftSubQuery, stack, data);
+      }
+      if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
+        visitChild(plan, rightSubQuery, stack, data);
+      }
+      stack.pop();
 
-    return new MasterPlan(root);
+      return node;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
index 8e89938..f921a15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
@@ -168,4 +168,12 @@ public class QueryContext extends Options {
   public boolean isInsert() {
     return getCommandType() == NodeType.INSERT;
   }
+
+  public void setHiveQueryMode() {
+    setBool("hive.query.mode", true);
+  }
+
+  public boolean isHiveQueryMode() {
+    return getBool("hive.query.mode");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
index 33a1e53..5875a6c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -129,8 +129,6 @@ public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher
       LOG.debug("Dispatching the event " + event.getClass().getName() + "."
           + event.toString());
     }
-//    LOG.info("====> Dispatching the event " + event.getClass().getName() + "."
-//        + event.toString() );
     Class<? extends Enum> type = event.getType().getDeclaringClass();
 
     try{

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 574122b..24eea42 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -40,6 +40,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
+import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.util.NetUtils;
 
@@ -454,17 +455,19 @@ public class TaskSchedulerImpl extends AbstractService
           }
         }
 
+        SubQuery subQuery = context.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+
         if (attemptId != null) {
-          QueryUnit task = context.getQuery()
-              .getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
+          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               new ArrayList<Fragment>(task.getAllFragments()),
-              task.getOutputName(),
+              "",
               false,
               task.getLogicalPlan().toJson(),
-              context.getQueryContext());
-          if (task.getStoreTableNode().isLocal()) {
+              context.getQueryContext(),
+              subQuery.getDataChannel());
+          if (!subQuery.getBlock().isRoot()) {
             taskAssign.setInterQuery();
           }
 
@@ -500,23 +503,24 @@ public class TaskSchedulerImpl extends AbstractService
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;
-          task = context.getSubQuery(
-              attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
+          SubQuery subQuery = context.getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+          task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),
-              task.getOutputName(),
+              "",
               false,
               task.getLogicalPlan().toJson(),
-              context.getQueryContext());
-          if (task.getStoreTableNode().isLocal()) {
+              context.getQueryContext(),
+              subQuery.getDataChannel());
+          if (!subQuery.getBlock().isRoot()) {
             taskAssign.setInterQuery();
           }
           for (ScanNode scan : task.getScanNodes()) {
             Collection<URI> fetches = task.getFetch(scan);
             if (fetches != null) {
               for (URI fetch : fetches) {
-                taskAssign.addFetch(scan.getTableId(), fetch);
+                taskAssign.addFetch(scan.getTableName(), fetch);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 53c08c0..4618da6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -32,12 +32,14 @@ public class QueryStartEvent extends AbstractEvent {
 
   private QueryId queryId;
   private QueryContext queryContext;
+  private String sql;
   private String logicalPlanJson;
 
-  public QueryStartEvent(QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
+  public QueryStartEvent(QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
     super(EventType.QUERY_START);
     this.queryId = queryId;
     this.queryContext = queryContext;
+    this.sql = sql;
     this.logicalPlanJson = logicalPlanJson;
   }
 
@@ -49,6 +51,10 @@ public class QueryStartEvent extends AbstractEvent {
     return this.queryContext;
   }
 
+  public String getSql() {
+    return this.sql;
+  }
+
   public String getLogicalPlanJson() {
     return logicalPlanJson;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 4ba95b0..4199e17 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoConstants;
@@ -36,11 +37,11 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableDescImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.ExecutionBlock;
 import org.apache.tajo.master.ExecutionBlockCursor;
 import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.*;
@@ -159,7 +160,7 @@ public class Query implements EventHandler<QueryEvent> {
       }
 
       float totalProgress = 0;
-      float proportion = 1.0f / (float)getExecutionBlockCursor().size();
+      float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
 
       for (int i = 0; i < subProgresses.length; i++) {
         totalProgress += subProgresses[i] * proportion;
@@ -240,7 +241,7 @@ public class Query implements EventHandler<QueryEvent> {
   }
 
   public Collection<SubQuery> getSubQueries() {
-    return Collections.unmodifiableCollection(this.subqueries.values());
+    return this.subqueries.values();
   }
 
   public QueryState getState() {
@@ -272,8 +273,8 @@ public class Query implements EventHandler<QueryEvent> {
 
     @Override
     public void transition(Query query, QueryEvent queryEvent) {
-      SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
-          query.sm);
+      SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
+          query.getExecutionBlockCursor().nextBlock(), query.sm);
       subQuery.setPriority(query.priority--);
       query.addSubQuery(subQuery);
       LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
@@ -292,11 +293,12 @@ public class Query implements EventHandler<QueryEvent> {
       query.completedSubQueryCount++;
       SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
       ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-
+      MasterPlan masterPlan = query.getPlan();
       // if the subquery is succeeded
       if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        if (cursor.hasNext()) {
-          SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+        ExecutionBlock nextBlock = cursor.nextBlock();
+        if (!query.getPlan().isTerminal(nextBlock) || !query.getPlan().isRoot(nextBlock)) {
+          SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
           nextSubQuery.setPriority(query.priority--);
           query.addSubQuery(nextSubQuery);
           nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
@@ -310,7 +312,7 @@ public class Query implements EventHandler<QueryEvent> {
 
         } else { // Finish a query
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-
+            DataChannel finalChannel = masterPlan.getChannel(castEvent.getExecutionBlockId(), nextBlock.getId());
             Path finalOutputDir = commitOutputData(query);
             TableDesc finalTableDesc = buildOrUpdateResultTableDesc(query, castEvent.getExecutionBlockId(), finalOutputDir);
 
@@ -361,7 +363,8 @@ public class Query implements EventHandler<QueryEvent> {
     /**
      * It builds a table desc and update the table desc if necessary.
      */
-    public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+    public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId,
+                                                  Path finalOutputDir) {
       // Determine the output table name
       SubQuery subQuery = query.getSubQuery(finalExecBlockId);
       QueryContext queryContext = query.context.getQueryContext();
@@ -450,33 +453,4 @@ public class Query implements EventHandler<QueryEvent> {
       writeLock.unlock();
     }
   }
-
-  public static interface QueryHook {
-    QueryState getTargetState();
-    void onEvent(Query query);
-  }
-
-  public static class QueryHookManager {
-    Map<QueryState, List<QueryHook>> hookList = TUtil.newHashMap();
-
-    public void addHook(QueryHook hook) {
-      if (hookList.containsKey(hook.getTargetState())) {
-        hookList.get(hook.getTargetState()).add(hook);
-      } else {
-        hookList.put(hook.getTargetState(), TUtil.newList(hook));
-      }
-    }
-
-    public void doHooks(Query query) {
-      QueryState finalState = query.checkQueryForCompleted();
-      List<QueryHook> list = hookList.get(finalState);
-      if (list != null) {
-        for (QueryHook hook : list) {
-          hook.onEvent(query);
-        }
-      } else {
-        LOG.error("QueryHookManager cannot deal with " + finalState + " event");
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index c54f8da..53dfb6a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -207,6 +207,7 @@ public class QueryInProgress extends CompositeService {
           TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
               .setQueryId(queryId.getProto())
               .setQueryContext(queryContext.getProto())
+              .setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
               .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
               .build(), NullCallback.get());
       querySubmitted.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 6611102..29dcb1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.GlobalOptimizer;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalPlanner;
 import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -55,8 +55,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private GlobalPlanner globalPlanner;
 
-  private GlobalOptimizer globalOptimizer;
-
   private AbstractStorageManager storageManager;
 
   private TajoConf systemConf;
@@ -93,8 +91,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
 
-      globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
-      globalOptimizer = new GlobalOptimizer();
+      globalPlanner = new GlobalPlanner(systemConf, storageManager);
 
       dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
 
@@ -217,9 +214,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
     public GlobalPlanner getGlobalPlanner() {
       return globalPlanner;
     }
-    public GlobalOptimizer getGlobalOptimizer() {
-      return globalOptimizer;
-    }
 
     public TajoWorker.WorkerContext getWorkerContext() {
       return workerContext;
@@ -253,9 +247,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
     @Override
     public void handle(QueryStartEvent event) {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
-      //To change body of implemented methods use File | Settings | File Templates.
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getQueryContext(), event.getLogicalPlanJson());
+          event.getQueryId(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
 
       queryMasterTask.init(systemConf);
       queryMasterTask.start();
@@ -280,17 +273,21 @@ public class QueryMaster extends CompositeService implements EventHandler {
         }
         synchronized(queryMasterTasks) {
           for(QueryMasterTask eachTask: tempTasks) {
-            TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
-                .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
-                .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
-                .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
-                .setState(eachTask.getState())
-                .setQueryId(eachTask.getQueryId().getProto())
-                .setQueryProgress(eachTask.getQuery().getProgress())
-                .setQueryFinishTime(eachTask.getQuery().getFinishTime())
-                .build();
-
-            workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+            try {
+              TajoMasterProtocol.TajoHeartbeat queryHeartbeat = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+                  .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+                  .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+                  .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+                  .setState(eachTask.getState())
+                  .setQueryId(eachTask.getQueryId().getProto())
+                  .setQueryProgress(eachTask.getQuery().getProgress())
+                  .setQueryFinishTime(eachTask.getQuery().getFinishTime())
+                  .build();
+
+              workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, NullCallback.get());
+            } catch (Throwable t) {
+              t.printStackTrace();
+            }
           }
         }
         synchronized(queryMasterStop) {
@@ -309,7 +306,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
   class ClientSessionTimeoutCheckThread extends Thread {
     public void run() {
       LOG.info("ClientSessionTimeoutCheckThread started");
-      while(true) {
+      while(!queryMasterStop.get()) {
         try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
@@ -337,5 +334,4 @@ public class QueryMaster extends CompositeService implements EventHandler {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index a4fabcf..e760626 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -30,16 +30,20 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.parser.HiveConverter;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.master.QueryContext;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -74,6 +78,8 @@ public class QueryMasterTask extends CompositeService {
 
   private MasterPlan masterPlan;
 
+  private String sql;
+
   private String logicalPlanJson;
 
   private TajoAsyncDispatcher dispatcher;
@@ -91,11 +97,12 @@ public class QueryMasterTask extends CompositeService {
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
+                         QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
     this.queryId = queryId;
     this.queryContext = queryContext;
+    this.sql = sql;
     this.logicalPlanJson = logicalPlanJson;
     this.querySubmitTime = System.currentTimeMillis();
   }
@@ -227,17 +234,44 @@ public class QueryMasterTask extends CompositeService {
       return;
     }
 
+    CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer();
+    Expr expr;
+    if (queryContext.isHiveQueryMode()) {
+      HiveConverter hiveConverter = new HiveConverter();
+      expr = hiveConverter.parse(sql);
+    } else {
+      SQLAnalyzer analyzer = new SQLAnalyzer();
+      expr = analyzer.parse(sql);
+    }
+    LogicalPlan plan = null;
     try {
-      LogicalRootNode logicalNodeRoot = (LogicalRootNode) CoreGsonHelper.fromJson(logicalPlanJson, LogicalNode.class);
-      LogicalNode[] scanNodes = PlannerUtil.findAllNodes(logicalNodeRoot, NodeType.SCAN);
-      if(scanNodes != null) {
-        for(LogicalNode eachScanNode: scanNodes) {
-          ScanNode scanNode = (ScanNode)eachScanNode;
-          tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+      plan = planner.createPlan(expr);
+      optimizer.optimize(plan);
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
+
+    GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
+    hookManager.addHook(new GlobalEngine.InsertHook());
+    hookManager.doHooks(queryContext, plan);
+
+    try {
+
+      for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+        LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
+        if(scanNodes != null) {
+          for(LogicalNode eachScanNode: scanNodes) {
+            ScanNode scanNode = (ScanNode)eachScanNode;
+            tableDescMap.put(scanNode.getFromTable().getTableName(), scanNode.getFromTable().getTableDesc());
+          }
         }
       }
-      MasterPlan globalPlan = queryMasterContext.getGlobalPlanner().build(queryId, logicalNodeRoot);
-      this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(globalPlan);
+
+      MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+      queryMasterContext.getGlobalPlanner().build(masterPlan);
+      //this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(masterPlan);
 
       query = new Query(queryTaskContext, queryId, querySubmitTime,
           "", queryTaskContext.getEventHandler(), masterPlan);
@@ -306,9 +340,9 @@ public class QueryMasterTask extends CompositeService {
       LOG.info("The staging dir '" + outputDir + "' is created.");
       queryContext.setStagingDir(stagingDir);
 
-      ////////////////////////////////////////////////////
-      // Check and Create An Output Directory If Necessary
-      ////////////////////////////////////////////////////
+      /////////////////////////////////////////////////
+      // Check and Create Output Directory If Necessary
+      /////////////////////////////////////////////////
       if (queryContext.hasOutputPath()) {
         outputDir = queryContext.getOutputPath();
         if (queryContext.isOutputOverwrite()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1b1d1e8c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 00dcc0b..5a30c04 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -156,10 +156,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   }
 	
 	public void setLogicalPlan(LogicalNode plan) {
-    Preconditions.checkArgument(plan.getType() == NodeType.STORE);
-    
 	  this.plan = plan;
-    store = (StoreTableNode) plan;
 
 	  LogicalNode node = plan;
 	  ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
@@ -250,7 +247,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	}
 	
 	public Collection<URI> getFetch(ScanNode scan) {
-	  return this.fetchMap.get(scan.getTableId());
+	  return this.fetchMap.get(scan.getTableName());
 	}
 
 	public String getOutputName() {