You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/10/16 11:28:26 UTC

[2/5] TAJO-255: Cleanup exceptions of engine. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
deleted file mode 100644
index 5673c5f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/InvalidQueryException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-public class InvalidQueryException extends RuntimeException {
-	private static final long serialVersionUID = -7085849718839416246L;
-
-  public InvalidQueryException() {
-    super();
-  }
-
-	public InvalidQueryException(String message) {
-    super(message);
-  }
-	
-	public InvalidQueryException(Throwable t) {
-		super(t);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
deleted file mode 100644
index c60aa1f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/NotSupportQueryException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.query.exception;
-
-
-public class NotSupportQueryException extends InvalidQueryException {
-	private static final long serialVersionUID = 4079784008765680410L;
-
-	/**
-	 * @param query
-	 */
-	public NotSupportQueryException(String query) {
-		super("Unsupported query: "+query);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
deleted file mode 100644
index 625981b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLParseError.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-
-import org.antlr.v4.runtime.Token;
-import org.apache.commons.lang.StringUtils;
-
-public class SQLParseError extends RuntimeException {
-  private String header;
-  private String errorLine;
-  private int charPositionInLine;
-  private int line;
-  private Token offendingToken;
-  private String detailedMessage;
-
-  public SQLParseError(Token offendingToken,
-                       int line, int charPositionInLine,
-                       String msg,
-                       String errorLine) {
-    super(msg);
-    this.offendingToken = offendingToken;
-    this.charPositionInLine = charPositionInLine;
-    this.line = line;
-    this.errorLine = errorLine;
-    this.header = msg;
-  }
-
-  @Override
-  public String getMessage() {
-    if (detailedMessage == null) {
-      if (offendingToken != null) {
-        detailedMessage = getDetailedMessageWithLocation();
-      } else {
-        StringBuilder sb = new StringBuilder();
-        sb.append("ERROR: ").append(header).append("\n");
-        sb.append("LINE: ").append(errorLine);
-        detailedMessage = sb.toString();
-      }
-    }
-
-    return detailedMessage;
-  }
-
-  public String getMessageHeader(){
-    return this.header;
-  }
-
-  private String getDetailedMessageWithLocation() {
-    StringBuilder sb = new StringBuilder();
-    int displayLimit = 80;
-    String queryPrefix = "LINE " + line + ":" + charPositionInLine + " ";
-    String prefixPadding = StringUtils.repeat(" ", queryPrefix.length());
-    String locationString;
-
-    int tokenLength = offendingToken.getStopIndex() - offendingToken.getStartIndex() + 1;
-    if(tokenLength > 0){
-      locationString = StringUtils.repeat(" ", charPositionInLine) + StringUtils.repeat("^", tokenLength);
-    } else {
-      locationString = StringUtils.repeat(" ", charPositionInLine) + "^";
-    }
-
-    sb.append("ERROR: ").append(header).append("\n");
-    sb.append(queryPrefix);
-
-    if (errorLine.length() > displayLimit) {
-      int padding = (displayLimit / 2);
-
-      String ellipsis = " ... ";
-      int startPos = locationString.length() - padding - 1;
-      if (startPos <= 0) {
-        startPos = 0;
-        sb.append(errorLine.substring(startPos, displayLimit)).append(ellipsis).append("\n");
-        sb.append(prefixPadding).append(locationString);
-      } else if (errorLine.length() - (locationString.length() + padding) <= 0) {
-        startPos = errorLine.length() - displayLimit - 1;
-        sb.append(ellipsis).append(errorLine.substring(startPos)).append("\n");
-        sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
-            .append(locationString.substring(startPos));
-      } else {
-        sb.append(ellipsis).append(errorLine.substring(startPos, startPos + displayLimit)).append(ellipsis).append("\n");
-        sb.append(prefixPadding).append(StringUtils.repeat(" ", ellipsis.length()))
-            .append(locationString.substring(startPos));
-      }
-    } else {
-      sb.append(errorLine).append("\n");
-      sb.append(prefixPadding).append(locationString);
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
deleted file mode 100644
index 619b11d..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/SQLSyntaxError.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query.exception;
-
-
-public class SQLSyntaxError extends InvalidQueryException {
-  private static final long serialVersionUID = 5388279335175632066L;
-
-  private String errorMessage;
-  private String detailedMessage;
-  private SQLParseError parseError;
-
-  public SQLSyntaxError(String errorMessage) {
-    this.errorMessage = errorMessage;
-  }
-
-  public SQLSyntaxError(SQLParseError e) {
-    this.errorMessage = e.getMessageHeader();
-    this.parseError = e;
-  }
-
-  @Override
-  public String getMessage() {
-    if (detailedMessage == null) {
-      if (parseError != null) {
-        detailedMessage = parseError.getMessage();
-      } else {
-        detailedMessage = String.format("ERROR: %s\n", errorMessage);
-      }
-    }
-    return detailedMessage;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
deleted file mode 100644
index c7f4e98..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/exception/UndefinedFunctionException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.query.exception;
-
-
-public class UndefinedFunctionException extends InvalidQueryException {
-	private static final long serialVersionUID = 113593927391549716L;
-
-	/**
-	 * @param signature
-	 */
-	public UndefinedFunctionException(String signature) {
-		super("Error: call to undefined function "+signature+"()");	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
deleted file mode 100644
index 971f13a..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/ipc/protocolrecords/QueryUnitRequest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.ipc.protocolrecords;
-
-import org.apache.tajo.DataChannel;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
-import org.apache.tajo.storage.Fragment;
-
-import java.net.URI;
-import java.util.List;
-
-public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
-
-	public QueryUnitAttemptId getId();
-	public List<Fragment> getFragments();
-	public String getOutputTableId();
-	public boolean isClusteredOutput();
-	public String getSerializedData();
-	public boolean isInterQuery();
-	public void setInterQuery();
-	public void addFetch(String name, URI uri);
-	public List<TajoWorkerProtocol.Fetch> getFetches();
-  public boolean shouldDie();
-  public void setShouldDie();
-  public QueryContext getQueryContext();
-  public DataChannel getDataChannel();
-  public Enforcer getEnforcer();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
deleted file mode 100644
index 3c03e50..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.logical.*;
-
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * An ExecutionBlock is a basic execution unit that could be distributed across a number of nodes.
- * An ExecutionBlock class contains input information (e.g., child execution blocks or input
- * tables), and output information (e.g., partition type, partition key, and partition number).
- * In addition, it includes a logical plan to be executed in each node.
- */
-public class ExecutionBlock {
-  private ExecutionBlockId executionBlockId;
-  private LogicalNode plan = null;
-  private StoreTableNode store = null;
-  private List<ScanNode> scanlist = new ArrayList<ScanNode>();
-  private ExecutionBlock parent;
-  private Map<ScanNode, ExecutionBlock> childSubQueries = new HashMap<ScanNode, ExecutionBlock>();
-  private PartitionType outputType;
-  private Enforcer enforcer = new Enforcer();
-
-  private boolean hasJoinPlan;
-  private boolean hasUnionPlan;
-
-  private Set<String> broadcasted = new HashSet<String>();
-
-  public ExecutionBlock(ExecutionBlockId executionBlockId) {
-    this.executionBlockId = executionBlockId;
-  }
-
-  public ExecutionBlockId getId() {
-    return executionBlockId;
-  }
-
-  public PartitionType getPartitionType() {
-    return outputType;
-  }
-
-  public void setPlan(LogicalNode plan) {
-    hasJoinPlan = false;
-    hasUnionPlan = false;
-    this.scanlist.clear();
-    this.plan = plan;
-
-    LogicalNode node = plan;
-    ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
-    s.add(node);
-    while (!s.isEmpty()) {
-      node = s.remove(s.size()-1);
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        s.add(s.size(), unary.getChild());
-      } else if (node instanceof BinaryNode) {
-        BinaryNode binary = (BinaryNode) node;
-        if (binary.getType() == NodeType.JOIN) {
-          hasJoinPlan = true;
-        } else if (binary.getType() == NodeType.UNION) {
-          hasUnionPlan = true;
-        }
-        s.add(s.size(), binary.getLeftChild());
-        s.add(s.size(), binary.getRightChild());
-      } else if (node instanceof ScanNode) {
-        scanlist.add((ScanNode)node);
-      } else if (node instanceof TableSubQueryNode) {
-        TableSubQueryNode subQuery = (TableSubQueryNode) node;
-        s.add(s.size(), subQuery.getSubQuery());
-      }
-    }
-  }
-
-
-  public LogicalNode getPlan() {
-    return plan;
-  }
-
-  public Enforcer getEnforcer() {
-    return enforcer;
-  }
-
-  public boolean isRoot() {
-    return !hasParentBlock() || !(getParentBlock().hasParentBlock()) && getParentBlock().hasUnion();
-  }
-
-  public boolean hasParentBlock() {
-    return parent != null;
-  }
-
-  public ExecutionBlock getParentBlock() {
-    return parent;
-  }
-
-  public Collection<ExecutionBlock> getChildBlocks() {
-    return Collections.unmodifiableCollection(childSubQueries.values());
-  }
-
-  public boolean isLeafBlock() {
-    return childSubQueries.size() == 0;
-  }
-
-  public StoreTableNode getStoreTableNode() {
-    return store;
-  }
-
-  public ScanNode [] getScanNodes() {
-    return this.scanlist.toArray(new ScanNode[scanlist.size()]);
-  }
-
-  public Schema getOutputSchema() {
-    return store.getOutSchema();
-  }
-
-  public boolean hasJoin() {
-    return hasJoinPlan;
-  }
-
-  public boolean hasUnion() {
-    return hasUnionPlan;
-  }
-
-  public void addBroadcastTables(Collection<String> tableNames) {
-    broadcasted.addAll(tableNames);
-  }
-
-  public void addBroadcastTable(String tableName) {
-    broadcasted.add(tableName);
-  }
-
-  public boolean isBroadcastTable(String tableName) {
-    return broadcasted.contains(tableName);
-  }
-
-  public Collection<String> getBroadcastTables() {
-    return broadcasted;
-  }
-
-  public String toString() {
-    return executionBlockId.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
deleted file mode 100644
index 51c825c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlockCursor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.tajo.engine.planner.global.MasterPlan;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-
-/**
- * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
- * This class is a pointer to an ExecutionBlock that the query engine should execute.
- * For each call of nextBlock(), it retrieves a next ExecutionBlock in a postfix order.
- */
-public class ExecutionBlockCursor {
-  private MasterPlan masterPlan;
-  private ArrayList<ExecutionBlock> orderedBlocks = new ArrayList<ExecutionBlock>();
-  private int cursor = 0;
-
-  public ExecutionBlockCursor(MasterPlan plan) {
-    this.masterPlan = plan;
-    buildOrder(plan.getRoot());
-  }
-
-  public int size() {
-    return orderedBlocks.size();
-  }
-
-  private void buildOrder(ExecutionBlock current) {
-    if (!masterPlan.isLeaf(current.getId())) {
-      if (masterPlan.getChildCount(current.getId()) == 1) {
-        ExecutionBlock block = masterPlan.getChild(current, 0);
-        buildOrder(block);
-      } else {
-        for (ExecutionBlock exec : masterPlan.getChilds(current)) {
-          buildOrder(exec);
-        }
-      }
-    }
-    orderedBlocks.add(current);
-  }
-
-  public boolean hasNext() {
-    return cursor < orderedBlocks.size();
-  }
-
-  public ExecutionBlock nextBlock() {
-    return orderedBlocks.get(cursor++);
-  }
-
-  public ExecutionBlock peek() {
-    return orderedBlocks.get(cursor);
-  }
-
-  public ExecutionBlock peek(int skip) {
-    return  orderedBlocks.get(cursor + skip);
-  }
-
-  public void reset() {
-    cursor = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c4aeed..4c12981 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -38,14 +38,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.ConstEval;
 import org.apache.tajo.engine.eval.FieldEval;
-import org.apache.tajo.engine.exception.EmptyClusterException;
 import org.apache.tajo.engine.exception.IllegalQueryStatusException;
-import org.apache.tajo.engine.exception.NoSuchQueryIdException;
-import org.apache.tajo.engine.exception.UnknownWorkerException;
 import org.apache.tajo.engine.parser.HiveConverter;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.querymaster.QueryInfo;
@@ -103,9 +101,7 @@ public class GlobalEngine extends AbstractService {
   }
 
   public GetQueryStatusResponse executeQuery(String sql)
-      throws InterruptedException, IOException,
-      NoSuchQueryIdException, IllegalQueryStatusException,
-      UnknownWorkerException, EmptyClusterException {
+      throws InterruptedException, IOException, IllegalQueryStatusException {
 
     LOG.info("SQL: " + sql);
     QueryContext queryContext = new QueryContext();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
deleted file mode 100644
index 6e4ab25..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ /dev/null
@@ -1,590 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.DataChannel;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.storage.AbstractStorageManager;
-
-import java.io.IOException;
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoWorkerProtocol.PartitionType.*;
-
-public class GlobalPlanner {
-  private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
-
-  private TajoConf conf;
-  private AbstractStorageManager sm;
-
-  public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm)
-      throws IOException {
-    this.conf = conf;
-    this.sm = sm;
-  }
-
-  public class GlobalPlanContext {
-    MasterPlan plan;
-    Set<String> broadcastTables = new HashSet<String>();
-    LogicalNode topmost;
-    LogicalNode lastRepartionableNode;
-    ExecutionBlock topMostLeftExecBlock;
-  }
-
-  /**
-   * Builds a master plan from the given logical plan.
-   */
-  public void build(MasterPlan masterPlan)
-      throws IOException, PlanningException {
-
-    DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
-    GlobalPlanContext globalPlanContext = new GlobalPlanContext();
-    globalPlanContext.plan = masterPlan;
-    LOG.info(masterPlan.getLogicalPlan());
-
-    LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
-    planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>());
-
-    ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
-
-    if (globalPlanContext.lastRepartionableNode != null
-        && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
-      UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
-      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-      UnionsFinderContext finderContext = new UnionsFinderContext();
-      finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
-
-      for (UnionNode union : finderContext.unionList) {
-        TableSubQueryNode leftSubQuery = union.getLeftChild();
-        TableSubQueryNode rightSubQuery = union.getRightChild();
-        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          execBlock.setPlan(leftSubQuery);
-          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
-          masterPlan.addConnect(dataChannel);
-        }
-        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          execBlock.setPlan(rightSubQuery);
-          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
-          masterPlan.addConnect(dataChannel);
-        }
-      }
-    } else {
-      DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
-      dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
-      masterPlan.addConnect(dataChannel);
-    }
-    masterPlan.setTerminal(terminalBlock);
-    LOG.info(masterPlan);
-  }
-
-  private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
-                                                LogicalNode childNode, ExecutionBlock lastChildBlock)
-      throws PlanningException {
-
-    ExecutionBlock currentBlock = null;
-    ExecutionBlock childBlock;
-    childBlock = lastChildBlock;
-
-    NodeType shuffleRequiredNodeType = lastDistNode.getType();
-    if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
-      ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
-      currentBlock = blocks[0];
-    } else if (shuffleRequiredNodeType == NodeType.SORT) {
-      ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
-      currentBlock = blocks[0];
-    } else if (shuffleRequiredNodeType == NodeType.JOIN) {
-      ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
-      currentBlock = blocks[0];
-    }
-
-    return currentBlock;
-  }
-
-  public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
-    Preconditions.checkArgument(channel.getSchema() != null,
-        "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
-    TableMeta meta = new TableMetaImpl(channel.getSchema(), channel.getStoreType(), new Options());
-    TableDesc desc = new TableDescImpl(channel.getSrcId().toString(), meta, new Path("/"));
-    return new ScanNode(plan.newPID(), desc);
-  }
-
-  private DataChannel createDataChannelFromJoin(ExecutionBlock leftBlock, ExecutionBlock rightBlock,
-                                                ExecutionBlock parent, JoinNode join, boolean leftTable) {
-    ExecutionBlock childBlock = leftTable ? leftBlock : rightBlock;
-
-    DataChannel channel = new DataChannel(childBlock, parent, HASH_PARTITION, 32);
-    if (join.getJoinType() != JoinType.CROSS) {
-      Column [][] joinColumns = PlannerUtil.joinJoinKeyForEachTable(join.getJoinQual(),
-          leftBlock.getPlan().getOutSchema(), rightBlock.getPlan().getOutSchema());
-      if (leftTable) {
-        channel.setPartitionKey(joinColumns[0]);
-      } else {
-        channel.setPartitionKey(joinColumns[1]);
-      }
-    }
-    return channel;
-  }
-
-  private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
-                                          ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
-      throws PlanningException {
-    ExecutionBlock currentBlock;
-
-    JoinNode joinNode = (JoinNode) lastDistNode;
-    LogicalNode leftNode = joinNode.getLeftChild();
-    LogicalNode rightNode = joinNode.getRightChild();
-
-    boolean leftBroadcasted = false;
-    boolean rightBroadcasted = false;
-
-    if (leftNode.getType() == NodeType.SCAN && rightNode.getType() == NodeType.SCAN ) {
-      ScanNode leftScan = (ScanNode) leftNode;
-      ScanNode rightScan = (ScanNode) rightNode;
-
-      TableMeta leftMeta = leftScan.getTableDesc().getMeta();
-      TableMeta rightMeta = rightScan.getTableDesc().getMeta();
-      long broadcastThreshold = conf.getLongVar(TajoConf.ConfVars.BROADCAST_JOIN_THRESHOLD);
-
-      if (leftMeta.getStat().getNumBytes() < broadcastThreshold) {
-        leftBroadcasted = true;
-      }
-      if (rightMeta.getStat().getNumBytes() < broadcastThreshold) {
-        rightBroadcasted = true;
-      }
-
-      if (leftBroadcasted || rightBroadcasted) {
-        currentBlock = masterPlan.newExecutionBlock();
-        currentBlock.setPlan(joinNode);
-        if (leftBroadcasted) {
-          currentBlock.addBroadcastTable(leftScan.getCanonicalName());
-        }
-        if (rightBroadcasted) {
-          currentBlock.addBroadcastTable(rightScan.getCanonicalName());
-        }
-        return new ExecutionBlock[] { currentBlock, childBlock };
-      }
-    }
-
-    // symmetric repartition join
-
-    ExecutionBlock leftBlock;
-    if (lastChildBlock == null) {
-      leftBlock = masterPlan.newExecutionBlock();
-      leftBlock.setPlan(leftNode);
-    } else {
-      leftBlock = lastChildBlock;
-    }
-    ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
-    rightBlock.setPlan(rightNode);
-
-    currentBlock = masterPlan.newExecutionBlock();
-
-    DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
-    DataChannel rightChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, false);
-
-    ScanNode leftScan = buildInputExecutor(masterPlan.getLogicalPlan(), leftChannel);
-    ScanNode rightScan = buildInputExecutor(masterPlan.getLogicalPlan(), rightChannel);
-
-    joinNode.setLeftChild(leftScan);
-    joinNode.setRightChild(rightScan);
-    currentBlock.setPlan(joinNode);
-
-    masterPlan.addConnect(leftChannel);
-    masterPlan.addConnect(rightChannel);
-
-    return new ExecutionBlock[] { currentBlock, childBlock };
-
-  }
-
-  private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
-                                         LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
-    ExecutionBlock currentBlock = null;
-    GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
-
-    if (groupByNode.isDistinct()) {
-      if (childBlock == null) { // first repartition node
-        childBlock = masterPlan.newExecutionBlock();
-      }
-      childBlock.setPlan(groupByNode.getChild());
-      currentBlock = masterPlan.newExecutionBlock();
-
-      LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-
-      for (Target target : groupByNode.getTargets()) {
-        List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
-        for (AggregationFunctionCallEval function : functions) {
-          if (function.isDistinct()) {
-            columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
-          }
-        }
-      }
-
-      Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
-      columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
-      SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
-      currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
-
-      DataChannel channel;
-      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-      channel.setPartitionKey(groupByNode.getGroupingColumns());
-      channel.setSchema(groupByNode.getInSchema());
-
-      GroupbyNode secondGroupBy = groupByNode;
-      ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-      secondGroupBy.setChild(scanNode);
-
-      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-      if (parent instanceof UnaryNode && parent != secondGroupBy) {
-        ((UnaryNode)parent).setChild(secondGroupBy);
-      }
-
-      masterPlan.addConnect(channel);
-      currentBlock.setPlan(currentNode);
-      
-    } else {
-
-      GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
-      firstGroupBy.setHavingCondition(null);
-
-      if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-          ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
-        UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
-        ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-        UnionsFinderContext finderContext = new UnionsFinderContext();
-        finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
-
-        currentBlock = masterPlan.newExecutionBlock();
-        GroupbyNode secondGroupBy = groupByNode;
-        for (UnionNode union : finderContext.unionList) {
-          TableSubQueryNode leftSubQuery = union.getLeftChild();
-          TableSubQueryNode rightSubQuery = union.getRightChild();
-          DataChannel dataChannel;
-          if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-            g1.setChild(leftSubQuery);
-            execBlock.setPlan(g1);
-            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-            secondGroupBy.setChild(scanNode);
-            masterPlan.addConnect(dataChannel);
-          }
-          if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-            g1.setChild(rightSubQuery);
-            execBlock.setPlan(g1);
-            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-            secondGroupBy.setChild(scanNode);
-            masterPlan.addConnect(dataChannel);
-          }
-        }
-        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-        if (parent instanceof UnaryNode && parent != secondGroupBy) {
-          ((UnaryNode)parent).setChild(secondGroupBy);
-        }
-        currentBlock.setPlan(currentNode);
-      } else {
-
-        if (childBlock == null) { // first repartition node
-          childBlock = masterPlan.newExecutionBlock();
-        }
-        childBlock.setPlan(firstGroupBy);
-
-        currentBlock = masterPlan.newExecutionBlock();
-
-        DataChannel channel;
-        if (firstGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
-          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-        } else {
-          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-        }
-        channel.setSchema(firstGroupBy.getOutSchema());
-
-        GroupbyNode secondGroupBy = groupByNode;
-        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        secondGroupBy.setChild(scanNode);
-
-        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-        if (parent instanceof UnaryNode && parent != secondGroupBy) {
-          ((UnaryNode)parent).setChild(secondGroupBy);
-        }
-
-        masterPlan.addConnect(channel);
-        currentBlock.setPlan(currentNode);
-      }
-    }
-
-    return new ExecutionBlock [] {currentBlock, childBlock};
-  }
-
-  private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
-                                          LogicalNode childNode, ExecutionBlock childBlock) {
-    ExecutionBlock currentBlock = null;
-
-    SortNode firstSort = (SortNode) lastDistNode;
-    if (childBlock == null) {
-      childBlock = masterPlan.newExecutionBlock();
-    }
-    childBlock.setPlan(firstSort);
-
-    currentBlock = masterPlan.newExecutionBlock();
-    DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
-    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
-    channel.setSchema(childNode.getOutSchema());
-
-    SortNode secondSort = PlannerUtil.clone(lastDistNode);
-    ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-    secondSort.setChild(secondScan);
-
-    LimitNode limitAndSort;
-    LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
-    if (limitOrNull != null) {
-      limitAndSort = PlannerUtil.clone(limitOrNull);
-      limitAndSort.setChild(firstSort);
-
-      if (childBlock.getPlan().getType() == NodeType.SORT) {
-        childBlock.setPlan(limitAndSort);
-      } else {
-        LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
-        if (sortParent != null) {
-          if (sortParent instanceof UnaryNode) {
-            ((UnaryNode)sortParent).setChild(limitAndSort);
-          }
-        }
-      }
-    }
-
-    LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-    if (parent instanceof UnaryNode && parent != secondSort) {
-      ((UnaryNode)parent).setChild(secondSort);
-    }
-
-    masterPlan.addConnect(channel);
-    currentBlock.setPlan(currentNode);
-
-    return new ExecutionBlock[] { currentBlock, childBlock };
-  }
-
-  public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext, LogicalNode> {
-
-    @Override
-    public LogicalNode visitRoot(GlobalPlanContext context, LogicalPlan plan, LogicalRootNode node,
-                                 Stack<LogicalNode> stack) throws PlanningException {
-      super.visitRoot(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
-      } else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
-
-      } else {
-        ExecutionBlock execBlock = context.plan.newExecutionBlock();
-        execBlock.setPlan(node);
-        context.topMostLeftExecBlock = execBlock;
-      }
-
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitProjection(GlobalPlanContext context, LogicalPlan plan, ProjectionNode node,
-                                       Stack<LogicalNode> stack) throws PlanningException {
-      super.visitProjection(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitLimit(GlobalPlanContext context, LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitLimit(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitSort(GlobalPlanContext context, LogicalPlan plan, SortNode node, Stack<LogicalNode> stack)
-        throws PlanningException {
-
-      super.visitSort(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
-            context.topMostLeftExecBlock);
-      }
-
-      context.topmost = node;
-      context.lastRepartionableNode = node;
-
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitGroupBy(GlobalPlanContext context, LogicalPlan plan, GroupbyNode node,
-                                    Stack<LogicalNode> stack) throws PlanningException {
-      super.visitGroupBy(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
-            context.topmost, context.topMostLeftExecBlock);
-      }
-
-      context.topmost = node;
-      context.lastRepartionableNode = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitFilter(GlobalPlanContext context, LogicalPlan plan, SelectionNode node,
-                                   Stack<LogicalNode> stack) throws PlanningException {
-      super.visitFilter(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitJoin(GlobalPlanContext context, LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitJoin(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
-            context.topMostLeftExecBlock);
-      }
-
-      context.topmost = node;
-      context.lastRepartionableNode = node;
-
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitUnion(GlobalPlanContext context, LogicalPlan plan, UnionNode node,
-                                  Stack<LogicalNode> stack) throws PlanningException {
-      super.visitUnion(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
-            context.topmost, context.topMostLeftExecBlock);
-      }
-
-      context.topmost = node;
-      context.lastRepartionableNode = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitExcept(GlobalPlanContext context, LogicalPlan plan, ExceptNode node,
-                                   Stack<LogicalNode> stack) throws PlanningException {
-      super.visitExcept(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitIntersect(GlobalPlanContext context, LogicalPlan plan, IntersectNode node,
-                                      Stack<LogicalNode> stack) throws PlanningException {
-      super.visitIntersect(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitTableSubQuery(GlobalPlanContext context, LogicalPlan plan, TableSubQueryNode node,
-                                          Stack<LogicalNode> stack) throws PlanningException {
-      super.visitTableSubQuery(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitScan(GlobalPlanContext context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
-        throws PlanningException {
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitStoreTable(GlobalPlanContext context, LogicalPlan plan, StoreTableNode node,
-                                       Stack<LogicalNode> stack) throws PlanningException {
-      super.visitStoreTable(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-
-    @Override
-    public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, InsertNode node,
-                                   Stack<LogicalNode> stack)
-        throws PlanningException {
-      super.visitInsert(context, plan, node, stack);
-      context.topmost = node;
-      return node;
-    }
-  }
-
-  private class UnionsFinderContext {
-    List<UnionNode> unionList = new ArrayList<UnionNode>();
-  }
-
-  @SuppressWarnings("unused")
-  private class ConsecutiveUnionFinder extends BasicLogicalPlanVisitor<UnionsFinderContext, LogicalNode> {
-    @Override
-    public LogicalNode visitUnion(UnionsFinderContext context, LogicalPlan plan, UnionNode node,
-                                  Stack<LogicalNode> stack)
-        throws PlanningException {
-      if (node.getType() == NodeType.UNION) {
-        context.unionList.add(node);
-      }
-
-      stack.push(node);
-      TableSubQueryNode leftSubQuery = node.getLeftChild();
-      TableSubQueryNode rightSubQuery = node.getRightChild();
-      if (leftSubQuery.getSubQuery().getType() == NodeType.UNION) {
-        visitChild(context, plan, leftSubQuery, stack);
-      }
-      if (rightSubQuery.getSubQuery().getType() == NodeType.UNION) {
-        visitChild(context, plan, rightSubQuery, stack);
-      }
-      stack.pop();
-
-      return node;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
deleted file mode 100644
index f921a15..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/QueryContext.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Options;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.NodeType;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.KeyValueSetProto;
-
-public class QueryContext extends Options {
-
-  public static final String COMMAND_TYPE = "tajo.query.command";
-
-  public static final String STAGING_DIR = "tajo.query.staging_dir";
-
-  public static final String USER_NAME = "tajo.query.username";
-
-  public static final String OUTPUT_TABLE_NAME = "tajo.query.output.table";
-  public static final String OUTPUT_TABLE_PATH = "tajo.query.output.path";
-  public static final String OUTPUT_OVERWRITE = "tajo.query.output.overwrite";
-  public static final String OUTPUT_AS_DIRECTORY = "tajo.query.output.asdirectory";
-
-  public static final String TRUE_VALUE = "1";
-  public static final String FALSE_VALUE = "0";
-
-  public QueryContext() {}
-
-  public QueryContext(KeyValueSetProto proto) {
-    super(proto);
-  }
-
-  public void put(TajoConf.ConfVars key, String value) {
-    put(key.varname, value);
-  }
-
-  public String get(TajoConf.ConfVars key) {
-    return get(key.varname);
-  }
-
-  public String get(String key) {
-    return super.get(key);
-  }
-
-  public void setBool(String key, boolean val) {
-    put(key, val ? TRUE_VALUE : FALSE_VALUE);
-  }
-
-  public boolean getBool(String key) {
-    String strVal = get(key);
-    return strVal != null ? strVal.equals(TRUE_VALUE) : false;
-  }
-
-  public void setUser(String username) {
-    put(USER_NAME, username);
-  }
-
-  public String getUser() {
-    return get(USER_NAME);
-  }
-
-  public void setStagingDir(Path path) {
-    put(STAGING_DIR, path.toUri().toString());
-  }
-
-  public Path getStagingDir() {
-    String strVal = get(STAGING_DIR);
-    return strVal != null ? new Path(strVal) : null;
-  }
-
-  /**
-   * The fact that QueryContext has an output table means this query has a target table.
-   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO <table name>' statement.
-   * This config is not set if a query has INSERT (OVERWRITE) INTO LOCATION '/path/..'.
-   */
-  public boolean hasOutputTable() {
-    return get(OUTPUT_TABLE_NAME) != null;
-  }
-
-  /**
-   * Set a target table name
-   *
-   * @param tableName The target table name
-   */
-  public void setOutputTable(String tableName) {
-    put(OUTPUT_TABLE_NAME, PlannerUtil.normalizeTableName(tableName));
-  }
-
-  public String getOutputTable() {
-    String strVal = get(OUTPUT_TABLE_NAME);
-    return strVal != null ? strVal : null;
-  }
-
-  /**
-   * The fact that QueryContext has an output path means this query will write the output to a specific directory.
-   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement.
-   *
-   * @return
-   */
-  public boolean hasOutputPath() {
-    return get(OUTPUT_TABLE_PATH) != null;
-  }
-
-  public void setOutputPath(Path path) {
-    put(OUTPUT_TABLE_PATH, path.toUri().toString());
-  }
-
-  public Path getOutputPath() {
-    String strVal = get(OUTPUT_TABLE_PATH);
-    return strVal != null ? new Path(strVal) : null;
-  }
-
-  public void setOutputOverwrite() {
-    setBool(OUTPUT_OVERWRITE, true);
-  }
-
-  public boolean isOutputOverwrite() {
-    return getBool(OUTPUT_OVERWRITE);
-  }
-
-  public void setFileOutput() {
-    setBool(OUTPUT_AS_DIRECTORY, true);
-  }
-
-  public boolean isFileOutput() {
-    return getBool(OUTPUT_AS_DIRECTORY);
-  }
-
-  public void setCommandType(NodeType nodeType) {
-    put(COMMAND_TYPE, nodeType.name());
-  }
-
-  public NodeType getCommandType() {
-    String strVal = get(COMMAND_TYPE);
-    return strVal != null ? NodeType.valueOf(strVal) : null;
-  }
-
-  public void setCreateTable() {
-    setCommandType(NodeType.CREATE_TABLE);
-  }
-
-  public boolean isCreateTable() {
-    return getCommandType() == NodeType.CREATE_TABLE;
-  }
-
-  public void setInsert() {
-    setCommandType(NodeType.INSERT);
-  }
-
-  public boolean isInsert() {
-    return getCommandType() == NodeType.INSERT;
-  }
-
-  public void setHiveQueryMode() {
-    setBool("hive.query.mode", true);
-  }
-
-  public boolean isHiveQueryMode() {
-    return getBool("hive.query.mode");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index cdbd803..e09dd69 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -32,7 +32,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
 import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskScheduleEvent;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 4618da6..2c8b822 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 
 /**
  * This event is conveyed to QueryMaster.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 38fbfe2..715f00c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoConstants;
@@ -39,9 +39,9 @@ import org.apache.tajo.catalog.TableDescImpl;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.master.ExecutionBlock;
-import org.apache.tajo.master.ExecutionBlockCursor;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.storage.AbstractStorageManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 6ce0fed..d603615 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,7 +29,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 75a58bc..faa5a5c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,7 +28,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index acb5dc8..94f0bc2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,8 +31,7 @@ import org.apache.hadoop.yarn.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.GlobalPlanner;
+import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.rpc.CallFuture2;
@@ -62,9 +62,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private TajoConf systemConf;
 
-  private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
+  private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
 
-  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
+  private Map<QueryId, QueryMasterTask> finishedQueryMasterTasks = Maps.newConcurrentMap();
 
   private ClientSessionTimeoutCheckThread clientSessionTimeoutCheckThread;
 
@@ -255,16 +255,17 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
 
     public void stopQuery(QueryId queryId) {
+      LOG.error(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> " + queryId + "<<>>>><<<<<<>>>>");
       QueryMasterTask queryMasterTask;
-      synchronized(queryMasterTasks) {
-        queryMasterTask = queryMasterTasks.remove(queryId);
-      }
+      queryMasterTask = queryMasterTasks.remove(queryId);
+      finishedQueryMasterTasks.put(queryId, queryMasterTask);
+
       if(queryMasterTask != null) {
         TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(queryMasterTask);
-        CallFuture2 futuer = new CallFuture2();
-        workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, futuer);
+        CallFuture2 future = new CallFuture2();
+        workerContext.getTajoMasterRpcClient().heartbeat(null, queryHeartbeat, future);
         try {
-          futuer.get(3000, TimeUnit.SECONDS);
+          future.get(3, TimeUnit.SECONDS);
         } catch (Throwable e) {
           LOG.warn(e);
         }
@@ -274,8 +275,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
         }
-
-        finishedQueryMasterTasks.put(queryId, queryMasterTask);
       } else {
         LOG.warn("No query info:" + queryId);
       }
@@ -386,7 +385,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       LOG.info("FinishedQueryMasterTaskCleanThread started: expireIntervalTime=" + expireIntervalTime);
       while(!queryMasterStop.get()) {
         try {
-          Thread.sleep(60 * 1000 * 60);   //hourly
+          Thread.sleep(60 * 1000 * 60);   // hourly
         } catch (InterruptedException e) {
           break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index adc50df..c386bf2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -42,10 +42,11 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.master.GlobalEngine;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -55,6 +56,8 @@ import org.apache.tajo.worker.YarnResourceAllocator;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -158,8 +161,14 @@ public class QueryMasterTask extends CompositeService {
 
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
+    CallFuture2 future = new CallFuture2();
     queryMasterContext.getWorkerContext().getTajoMasterRpcClient()
-        .stopQueryMaster(null, queryId.getProto(), NullCallback.get());
+        .stopQueryMaster(null, queryId.getProto(), future);
+    try {
+      future.get(3, TimeUnit.SECONDS);
+    } catch (Throwable t) {
+      LOG.warn(t);
+    }
 
     super.stop();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 194cf09..10795ea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.catalog.*;
@@ -38,7 +38,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.Fragment;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index cc38062..cb35954 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.DataChannel;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
@@ -49,7 +49,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.TaskScheduler;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
deleted file mode 100644
index e1e074d..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/ScheduleTaskEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-import org.apache.tajo.QueryUnitAttemptId;
-
-public class ScheduleTaskEvent extends SchedulerEvent {
-  private final QueryUnitAttemptId attemptId;
-
-  public ScheduleTaskEvent(QueryUnitAttemptId id) {
-    super(SchedulerEventType.SCHEDULE);
-    this.attemptId = id;
-  }
-
-  public QueryUnitAttemptId getTaskAttemptId() {
-    return attemptId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
deleted file mode 100644
index 664a2d6..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SchedulerEvent extends AbstractEvent<SchedulerEventType> {
-
-  public SchedulerEvent(SchedulerEventType schedulerEventType) {
-    super(schedulerEventType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
deleted file mode 100644
index 2e49c47..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/scheduler/event/SchedulerEventType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.scheduler.event;
-
-/**
- * Event Type for Scheduler
- */
-public enum SchedulerEventType {
-  SCHEDULE,
-  RESCHEDULE
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 5cf2e7c..1731854 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -18,9 +18,6 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PhysicalPlanner;
 import org.apache.tajo.engine.planner.PhysicalPlannerImpl;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index d9984b8..838811a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index a77ad2a..2e73b2b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -197,16 +197,16 @@ public class TajoWorkerClientService extends AbstractService {
             TajoIdProtos.QueryIdProto request) throws ServiceException {
       final QueryId queryId = new QueryId(request);
       LOG.info("Stop Query:" + queryId);
-      Thread t = new Thread() {
-        public void run() {
+//      Thread t = new Thread() {
+//        public void run() {
 //          try {
 //            Thread.sleep(1000);   //wait tile return to rpc response
 //          } catch (InterruptedException e) {
 //          }
           workerContext.getQueryMaster().getContext().stopQuery(queryId);
-        }
-      };
-      t.start();
+//        }
+//      };
+//      t.start();
       return BOOL_TRUE;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 29f44eb..76011dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TaskSchedulerImpl;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.querymaster.QueryMaster;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3f45f0cd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 0a45cfb..d845f4f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,16 +30,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tajo.DataChannel;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.exception.UnfinishedTaskException;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
@@ -48,8 +45,8 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
-import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.QueryContext;
+import org.apache.tajo.engine.query.QueryUnitRequest;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
@@ -300,12 +297,11 @@ public class Task {
 
   public void cleanUp() {
     // remove itself from worker
-    // 끝난건지 확인
+
     if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
       try {
         // context.getWorkDir() 지우기
         localFS.delete(context.getWorkDir(), true);
-        // tasks에서 자기 지우기
         synchronized (taskRunnerContext.getTasks()) {
           taskRunnerContext.getTasks().remove(this.getId());
         }
@@ -313,8 +309,7 @@ public class Task {
         e.printStackTrace();
       }
     } else {
-      LOG.error(new UnfinishedTaskException("QueryUnitAttemptId: "
-          + context.getTaskId() + " status: " + context.getState()));
+      LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState());
     }
   }