You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:11 UTC

[2/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index a41b280..47ec7bc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -26,20 +26,20 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -51,7 +51,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.*;
 
-import static org.apache.tajo.ipc.QueryMasterProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 
 /**
  * The driver class for Tajo QueryUnit processing.
@@ -60,17 +60,17 @@ public class TaskRunner extends AbstractService {
   /** class logger */
   private static final Log LOG = LogFactory.getLog(TaskRunner.class);
 
-  private QueryConf conf;
+  private QueryConf queryConf;
 
   private volatile boolean stopped = false;
 
-  private final SubQueryId subQueryId;
-  private ApplicationId appId;
-  private final NodeId nodeId;
-  private final ContainerId containerId;
+  private ExecutionBlockId executionBlockId;
+  private QueryId queryId;
+  private NodeId nodeId;
+  private ContainerId containerId;
 
   // Cluster Management
-  private QueryMasterProtocol.QueryMasterProtocolService.Interface master;
+  private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
 
   // for temporal or intermediate files
   private FileSystem localFS;
@@ -94,7 +94,7 @@ public class TaskRunner extends AbstractService {
   private Thread taskLauncher;
 
   // Contains the object references related for TaskRunner
-  private WorkerContext workerContext;
+  private TaskRunnerContext taskRunnerContext;
   // for the doAs block
   private UserGroupInformation taskOwner;
 
@@ -102,34 +102,89 @@ public class TaskRunner extends AbstractService {
   private String baseDir;
   private Path baseDirPath;
 
+  private ProtoAsyncRpcClient client;
+
+  private TaskRunnerManager taskRunnerManager;
+
   public TaskRunner(
-      final SubQueryId subQueryId,
+      final ExecutionBlockId executionBlockId,
       final NodeId nodeId,
       UserGroupInformation taskOwner,
       Interface master, ContainerId containerId) {
     super(TaskRunner.class.getName());
-    this.subQueryId = subQueryId;
-    this.appId = subQueryId.getQueryId().getApplicationId();
+    this.executionBlockId = executionBlockId;
+    this.queryId = executionBlockId.getQueryId();
     this.nodeId = nodeId;
     this.taskOwner = taskOwner;
     this.master = master;
     this.containerId = containerId;
   }
 
-  @Override
-  public void init(Configuration _conf) {
-    this.conf = (QueryConf) _conf;
+  public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
+    super(TaskRunner.class.getName());
 
+    this.taskRunnerManager = taskRunnerManager;
     try {
-      this.workerContext = new WorkerContext();
+      final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
+
+      conf.setOutputPath(new Path(args[6]));
+
+      LOG.info("NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
+      LOG.info("OUTPUT DIR: " + conf.getOutputPath());
+      LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+
+      UserGroupInformation.setConfiguration(conf);
+
+      // QueryBlockId from String
+      // NodeId has a form of hostname:port.
+      NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+      this.containerId = ConverterUtils.toContainerId(args[3]);
+
+      // QueryMaster's address
+      String host = args[4];
+      int port = Integer.parseInt(args[5]);
+      final InetSocketAddress masterAddr = NetUtils.createSocketAddrForHost(host, port);
+
+      LOG.info("QueryMaster Address:" + masterAddr);
+      // TODO - 'load credential' should be implemented
+      // Getting taskOwner
+      UserGroupInformation taskOwner =
+          UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
+      //taskOwner.addToken(token);
+
+      // initialize MasterWorkerProtocol as an actual task owner.
+      this.client =
+          taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
+            @Override
+            public ProtoAsyncRpcClient run() throws Exception {
+              return new ProtoAsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+            }
+          });
+      this.master = client.getStub();
+
+      this.executionBlockId = executionBlockId;
+      this.queryId = executionBlockId.getQueryId();
+      this.nodeId = nodeId;
+      this.taskOwner = taskOwner;
+
+      this.taskRunnerContext = new TaskRunnerContext();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.queryConf = (QueryConf)conf;
 
+    try {
       // initialize DFS and LocalFileSystems
-      defaultFS = FileSystem.get(URI.create(conf.getVar(ConfVars.ROOT_DIR)),conf);
+      defaultFS = FileSystem.get(URI.create(queryConf.getVar(ConfVars.ROOT_DIR)),conf);
       localFS = FileSystem.getLocal(conf);
 
       // the base dir for an output dir
-      baseDir = ConverterUtils.toString(appId)
-          + "/output" + "/" + subQueryId.getId();
+      baseDir = queryId.toString()
+          + "/output" + "/" + executionBlockId.getId();
 
       // initialize LocalDirAllocator
       lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
@@ -139,9 +194,7 @@ public class TaskRunner extends AbstractService {
 
       // Setup QueryEngine according to the query plan
       // Here, we can setup row-based query engine or columnar query engine.
-      this.queryEngine = new TajoQueryEngine(conf);
-
-      Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+      this.queryEngine = new TajoQueryEngine(queryConf);
     } catch (Throwable t) {
       LOG.error(t);
     }
@@ -152,39 +205,46 @@ public class TaskRunner extends AbstractService {
   @Override
   public void start() {
     run();
+    super.start();
   }
 
   @Override
   public void stop() {
-    if (!isStopped()) {
-      // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
-      for (Task task : tasks.values()) {
-        if (task.getStatus() == TaskAttemptState.TA_PENDING ||
-            task.getStatus() == TaskAttemptState.TA_RUNNING) {
-          task.setState(TaskAttemptState.TA_FAILED);
-        }
+    if(isStopped()) {
+      return;
+    }
+    // If this flag become true, taskLauncher will be terminated.
+    this.stopped = true;
+
+    // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
+    for (Task task : tasks.values()) {
+      if (task.getStatus() == TaskAttemptState.TA_PENDING ||
+          task.getStatus() == TaskAttemptState.TA_RUNNING) {
+        task.setState(TaskAttemptState.TA_FAILED);
       }
+    }
 
-      // If this flag become true, taskLauncher will be terminated.
-      this.stopped = true;
+    if(client != null) {
+      client.close();
+      client = null;
+    }
 
-      LOG.info("STOPPED: " + nodeId);
-      synchronized (this) {
-        notifyAll();
-      }
+    LOG.info("Stop TaskRunner: " + executionBlockId);
+    synchronized (this) {
+      notifyAll();
     }
   }
 
-  class WorkerContext {
-    public QueryConf getConf() {
-      return conf;
+  public class TaskRunnerContext {
+    public QueryConf getQueryConf() {
+      return queryConf;
     }
 
     public String getNodeId() {
       return nodeId.toString();
     }
 
-    public QueryMasterProtocolService.Interface getMaster() {
+    public TajoWorkerProtocolService.Interface getMaster() {
       return master;
     }
 
@@ -219,9 +279,17 @@ public class TaskRunner extends AbstractService {
     public Path getBaseDir() {
       return baseDirPath;
     }
+
+    public ExecutionBlockId getExecutionBlockId() {
+      return executionBlockId;
+    }
+  }
+
+  public TaskRunnerContext getContext() {
+    return taskRunnerContext;
   }
 
-  static void fatalError(QueryMasterProtocolService.Interface proxy,
+  static void fatalError(TajoWorkerProtocolService.Interface proxy,
                          QueryUnitAttemptId taskAttemptId, String message) {
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
@@ -245,17 +313,27 @@ public class TaskRunner extends AbstractService {
             try {
               if (callFuture == null) {
                 callFuture = new CallFuture2<QueryUnitRequestProto>();
-                master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
-                    callFuture);
+                LOG.info("====>Request GetTask:" + executionBlockId + "," + containerId);
+                GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
+                    .setExecutionBlockId(executionBlockId.getProto())
+                    .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+                    .build();
+                master.getTask(null, request, callFuture);
               }
               try {
                 // wait for an assigning task for 3 seconds
                 taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+              } catch (InterruptedException e) {
+                if(stopped) {
+                  break;
+                }
               } catch (TimeoutException te) {
+                if(stopped) {
+                  break;
+                }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
-                LOG.error(te);
-
+                LOG.warn("Timeout getResource:" + executionBlockId + ", but retry", te);
                 continue;
               }
 
@@ -264,9 +342,12 @@ public class TaskRunner extends AbstractService {
                 // If TaskRunner receives the terminal signal, TaskRunner will be terminated
                 // immediately.
                 if (taskRequest.getShouldDie()) {
-                  LOG.info("received ShouldDie flag");
+                  LOG.info("Received ShouldDie flag:" + executionBlockId);
                   stop();
-
+                  if(taskRunnerManager != null) {
+                    //notify to TaskRunnerManager
+                    taskRunnerManager.stopTask(executionBlockId);
+                  }
                 } else {
 
                   LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -280,7 +361,7 @@ public class TaskRunner extends AbstractService {
                   LOG.info("Initializing: " + taskAttemptId);
                   Task task;
                   try {
-                    task = new Task(taskAttemptId, workerContext, master,
+                    task = new Task(taskAttemptId, taskRunnerContext, master,
                         new QueryUnitRequestImpl(taskRequest));
                     tasks.put(taskAttemptId, task);
 
@@ -291,7 +372,7 @@ public class TaskRunner extends AbstractService {
                     // task.run() is a blocking call.
                     task.run();
                   } catch (Throwable t) {
-                    fatalError(workerContext.getMaster(), taskAttemptId, t.getMessage());
+                    fatalError(taskRunnerContext.getMaster(), taskAttemptId, t.getMessage());
                   } finally {
                     callFuture = null;
                     taskRequest = null;
@@ -318,14 +399,6 @@ public class TaskRunner extends AbstractService {
     }
   }
 
-  private class ShutdownHook implements Runnable {
-    @Override
-    public void run() {
-      LOG.info("received SIGINT Signal");
-      stop();
-    }
-  }
-
   /**
    * @return true if a stop has been requested.
    */
@@ -333,68 +406,7 @@ public class TaskRunner extends AbstractService {
     return this.stopped;
   }
 
-  /**
-   * TaskRunner takes 5 arguments as follows:
-   * <ol>
-   * <li>1st: SubQueryId</li>
-   * <li>2nd: NodeId</li>
-   * <li>3nd: ContainerId</li>
-   * <li>4th: QueryMaster hostname</li>
-   * <li>5th: QueryMaster port</li>
-   * </ol>
-   */
-  public static void main(String[] args) throws Exception {
-    // Restore QueryConf
-    final QueryConf conf = new QueryConf();
-    conf.addResource(new Path(QueryConf.FILENAME));
-
-    LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
-    LOG.info("OUTPUT DIR: " + conf.getOutputPath());
-    LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
-
-    UserGroupInformation.setConfiguration(conf);
-
-    // SubQueryId from String
-    final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[0]);
-    // NodeId has a form of hostname:port.
-    NodeId nodeId = ConverterUtils.toNodeId(args[1]);
-    ContainerId containerId = ConverterUtils.toContainerId(args[2]);
-
-    // QueryMaster's address
-    String host = args[3];
-    int port = Integer.parseInt(args[4]);
-    final InetSocketAddress masterAddr =
-            NetUtils.createSocketAddrForHost(host, port);
-
-    // TODO - 'load credential' should be implemented
-    // Getting taskOwner
-    UserGroupInformation taskOwner =
-        UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
-    //taskOwner.addToken(token);
-
-    // QueryMasterService RPC
-    ProtoAsyncRpcClient client;
-    QueryMasterProtocolService.Interface master;
-
-    // initialize MasterWorkerProtocol as an actual task owner.
-    client =
-        taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
-          @Override
-          public ProtoAsyncRpcClient run() throws Exception {
-            return new ProtoAsyncRpcClient(QueryMasterProtocol.class, masterAddr);
-          }
-        });
-    master = client.getStub();
-
-
-    TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
-    try {
-      taskRunner.init(conf);
-      taskRunner.start();
-    } finally {
-      client.close();
-      LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
-      System.exit(0);
-    }
+  public ExecutionBlockId getExecutionBlockId() {
+    return this.executionBlockId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
new file mode 100644
index 0000000..dcd44df
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TaskRunnerManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
+
+  private Map<ExecutionBlockId, TaskRunner> taskRunnerMap = new HashMap<ExecutionBlockId, TaskRunner>();
+  private TajoWorker.WorkerContext workerContext;
+  private TajoConf tajoConf;
+  private AtomicBoolean stop = new AtomicBoolean(false);
+
+  public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+    super(TaskRunnerManager.class.getName());
+
+    this.workerContext = workerContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    tajoConf = (TajoConf)conf;
+    super.init(tajoConf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(stop.get()) {
+      return;
+    }
+    stop.set(true);
+    synchronized(taskRunnerMap) {
+      for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+        if(!eachTaskRunner.isStopped()) {
+          eachTaskRunner.stop();
+        }
+      }
+    }
+    super.stop();
+    if(!workerContext.isStandbyMode()) {
+      workerContext.stopWorker(true);
+    }
+  }
+
+  public void stopTask(ExecutionBlockId executionBlockId) {
+    LOG.info("Stop Task:" + executionBlockId);
+    synchronized(taskRunnerMap) {
+      taskRunnerMap.remove(executionBlockId);
+    }
+    if(!workerContext.isStandbyMode()) {
+      stop();
+    }
+  }
+
+  public void startTask(final String[] params) {
+    //TODO change to use event dispatcher
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          QueryConf queryConf = new QueryConf(tajoConf);
+          TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+          synchronized(taskRunnerMap) {
+            taskRunnerMap.put(taskRunner.getContext().getExecutionBlockId(), taskRunner);
+          }
+          taskRunner.init(queryConf);
+          taskRunner.start();
+        } catch (Exception e) {
+          LOG.error(e.getMessage(), e);
+          throw new RuntimeException(e.getMessage(), e);
+        }
+      }
+    };
+
+    t.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
new file mode 100644
index 0000000..9470a88
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.YarnRMContainerAllocator;
+
+public class YarnResourceAllocator extends AbstractResourceAllocator {
+  private YarnRMContainerAllocator rmAllocator;
+
+  private TaskRunnerLauncher taskRunnerLauncher;
+
+  private YarnRPC yarnRPC;
+
+  private YarnClient yarnClient;
+
+  private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
+
+  private QueryMasterTask.QueryContext queryContext;
+
+  private QueryConf queryConf;
+
+  public YarnResourceAllocator(QueryMasterTask.QueryContext queryContext) {
+    this.queryContext = queryContext;
+  }
+
+  @Override
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
+    return new ContainerIdPBImpl(containerId);
+  }
+
+  @Override
+  public void allocateTaskWorker() {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    queryConf = (QueryConf)conf;
+
+    yarnRPC = YarnRPC.create(queryConf);
+
+    connectYarnClient();
+
+    taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryContext, yarnRPC);
+    addService((org.apache.hadoop.yarn.service.Service) taskRunnerLauncher);
+    queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+    rmAllocator = new YarnRMContainerAllocator(queryContext);
+    addService(rmAllocator);
+    queryContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    try {
+      this.yarnClient.stop();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  private void connectYarnClient() {
+    this.yarnClient = new YarnClientImpl();
+    this.yarnClient.init(queryConf);
+    this.yarnClient.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
index d602d57..2ef0c4c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -22,14 +22,14 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -81,7 +81,7 @@ public class AdvancedDataRetriever implements DataRetriever {
       List<String> qids = splitMaps(params.get("qid"));
       for (String qid : qids) {
         String[] ids = qid.split("_");
-        SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0));
+        ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
         QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
         QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
             Integer.parseInt(ids[1]));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index 61c14c4..43d99ef 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -51,13 +51,13 @@ message UpdateQueryResponse {
 
 message SubmitQueryResponse {
   required ResultCode resultCode = 1;
-  optional ApplicationAttemptIdProto queryId = 2;
+  optional string queryId = 2;
   optional string errorMessage = 3;
 }
 
 message GetQueryResultRequest {
   optional SessionIdProto sessionId = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required string queryId = 2;
 }
 
 message GetQueryResultResponse {
@@ -70,7 +70,7 @@ message GetQueryListRequest {
 }
 
 message BriefQueryStatus {
-  required ApplicationAttemptIdProto queryId = 1;
+  required string queryId = 1;
   required QueryState state = 2;
   required int32 executionTime = 3;
 }
@@ -81,12 +81,12 @@ message GetQueryListResponse {
 
 message GetQueryStatusRequest {
   optional SessionIdProto sessionId = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required string queryId = 2;
 }
 
 message GetQueryStatusResponse {
   required ResultCode resultCode = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required string queryId = 2;
   optional QueryState state = 3;
   optional float progress = 4;
   optional int64 submitTime = 5;
@@ -142,7 +142,7 @@ service ClientProtocolService {
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
   rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
   rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
-  rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+  rpc killQuery(StringProto) returns (BoolProto);
   rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
   rpc existTable(StringProto) returns (BoolProto);
   rpc getTableList(GetTableListRequest) returns (GetTableListResponse);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index 2c5c2b6..f3b1005 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -51,13 +51,13 @@ message UpdateQueryResponse {
 
 message SubmitQueryResponse {
   required ResultCode resultCode = 1;
-  optional ApplicationAttemptIdProto queryId = 2;
+  optional QueryIdProto queryId = 2;
   optional string errorMessage = 3;
 }
 
 message GetQueryResultRequest {
   optional SessionIdProto sessionId = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required QueryIdProto queryId = 2;
 }
 
 message GetQueryResultResponse {
@@ -70,7 +70,7 @@ message GetQueryListRequest {
 }
 
 message BriefQueryStatus {
-  required ApplicationAttemptIdProto queryId = 1;
+  required QueryIdProto queryId = 1;
   required QueryState state = 2;
   required int32 executionTime = 3;
 }
@@ -81,12 +81,12 @@ message GetQueryListResponse {
 
 message GetQueryStatusRequest {
   optional SessionIdProto sessionId = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required QueryIdProto queryId = 2;
 }
 
 message GetQueryStatusResponse {
   required ResultCode resultCode = 1;
-  required ApplicationAttemptIdProto queryId = 2;
+  required QueryIdProto queryId = 2;
   optional QueryState state = 3;
   optional float progress = 4;
   optional int64 submitTime = 5;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
index 9337078..7da83bc 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
@@ -32,5 +32,5 @@ service QueryMasterClientProtocolService {
   rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
   rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
-  rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+  rpc killQuery(QueryIdProto) returns (BoolProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
deleted file mode 100644
index 08fc5c9..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "QueryMasterManagerProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-
-message QueryHeartbeat {
-  required ApplicationAttemptIdProto queryId = 1;
-  required string queryMasterHost = 2;
-  required int32 queryMasterPort = 3;
-  required int32 queryMasterClientPort = 4;
-  required QueryState state = 5;
-  optional string statusMessage = 6;
-}
-
-message QueryHeartbeatResponse {
-  message ResponseCommand {
-      required string command = 1;
-      repeated string params = 2;
-  }
-  required BoolProto heartbeatResult = 1;
-  optional ResponseCommand responseCommand = 3;
-}
-
-service QueryMasterManagerProtocolService {
-  rpc queryHeartbeat(QueryHeartbeat) returns (QueryHeartbeatResponse);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
deleted file mode 100644
index b6a0602..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "QueryMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-
-message TaskStatusProto {
-  required QueryUnitAttemptIdProto id = 1;
-  required string workerName = 2;
-  required float progress = 3;
-  required TaskAttemptState state = 4;
-  optional StatSetProto stats = 5;
-  optional TableStatProto resultStats = 6;
-  repeated Partition partitions = 7;
-}
-
-message TaskCompletionReport {
-  required QueryUnitAttemptIdProto id = 1;
-  optional StatSetProto stats = 2;
-  optional TableStatProto resultStats = 3;
-  repeated Partition partitions = 4;
-}
-
-message TaskFatalErrorReport {
-  required QueryUnitAttemptIdProto id = 1;
-  optional string error_message = 2;
-}
-
-message QueryUnitRequestProto {
-    required QueryUnitAttemptIdProto id = 1;
-    repeated FragmentProto fragments = 2;
-    required string outputTable = 3;
-    required bool clusteredOutput = 4;
-    required string serializedData = 5;
-    optional bool interQuery = 6 [default = false];
-    repeated Fetch fetches = 7;
-    optional bool shouldDie = 8;
-}
-
-message Fetch {
-    required string name = 1;
-    required string urls = 2;
-}
-
-message QueryUnitResponseProto {
-    required string id = 1;
-    required QueryState status = 2;
-}
-
-message StatusReportProto {
-  required int64 timestamp = 1;
-  required string serverName = 2;
-  repeated TaskStatusProto status = 3;
-  repeated QueryUnitAttemptIdProto pings = 4;
-}
-
-message CommandRequestProto {
-    repeated Command command = 1;
-}
-
-message CommandResponseProto {
-}
-
-message Command {
-    required QueryUnitAttemptIdProto id = 1;
-    required CommandType type = 2;
-}
-
-enum CommandType {
-    PREPARE = 0;
-    LAUNCH = 1;
-    STOP = 2;
-    FINALIZE = 3;
-}
-
-message Partition {
-    required int32 partitionKey = 1;
-    optional string fileName = 2;
-}
-
-message ServerStatusProto {
-    message System {
-        required int32 availableProcessors = 1;
-        required int64 freeMemory = 2;
-        required int64 maxMemory = 3;
-        required int64 totalMemory = 4;
-    }
-    message Disk {
-        required string absolutePath = 1;
-        required int64 totalSpace = 2;
-        required int64 freeSpace = 3;
-        required int64 usableSpace = 4;
-    }
-    required System system = 1;
-    repeated Disk disk = 2;
-    required int32 taskNum = 3;
-}
-
-service QueryMasterProtocolService {
-  //from Worker
-  rpc getTask(ContainerIdProto) returns (QueryUnitRequestProto);
-  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
-  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
-  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
-  rpc done (TaskCompletionReport) returns (BoolProto);
-
-  //from QueryMasterManager
-  rpc executeQuery(StringProto) returns (BoolProto);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
index 04c67f2..a87c825 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
@@ -21,15 +21,18 @@ option java_outer_classname = "TajoIdProtos";
 option java_generic_services = false;
 option java_generate_equals_and_hash = true;
 
-import "yarn_protos.proto";
+message QueryIdProto {
+    required string id = 1;
+    required int32 seq = 2;
+}
 
-message SubQueryIdProto {
-    required ApplicationAttemptIdProto queryId = 1;
+message ExecutionBlockIdProto {
+    required QueryIdProto queryId = 1;
     required int32 id = 2;
 }
 
 message QueryUnitIdProto {
-    required SubQueryIdProto subQueryId = 1;
+    required ExecutionBlockIdProto executionBlockId = 1;
     required int32 id = 2;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
index ef7e711..26dbbed 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+//TajoClient -> TajoMaster Protocol
 option java_package = "org.apache.tajo.ipc";
 option java_outer_classname = "TajoMasterClientProtocol";
 option java_generic_services = true;
@@ -30,12 +31,12 @@ import "ClientProtos.proto";
 
 service TajoMasterClientProtocolService {
   rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
-  rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
+  rpc submitQuery(QueryRequest) returns (GetQueryStatusResponse);
   rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
   rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
   rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
-  rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+  rpc killQuery(QueryIdProto) returns (BoolProto);
   rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
   rpc existTable(StringProto) returns (BoolProto);
   rpc getTableList(GetTableListRequest) returns (GetTableListResponse);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
new file mode 100644
index 0000000..0153c8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message ServerStatusProto {
+    message System {
+        required int32 availableProcessors = 1;
+        required int32 freeMemoryMB = 2;
+        required int32 maxMemoryMB = 3;
+        required int32 totalMemoryMB = 4;
+    }
+    message Disk {
+        required string absolutePath = 1;
+        required int64 totalSpace = 2;
+        required int64 freeSpace = 3;
+        required int64 usableSpace = 4;
+    }
+    required System system = 1;
+    required int32 diskSlots = 2;
+    repeated Disk disk = 3;
+    required int32 runningTaskNum = 4;
+}
+
+message TajoHeartbeat {
+  required string tajoWorkerHost = 1;
+  required int32 tajoWorkerPort = 2;
+  optional ServerStatusProto serverStatus = 3;
+  optional int32 tajoWorkerClientPort = 4;
+  optional QueryIdProto queryId = 5;
+  optional QueryState state = 6;
+  optional string statusMessage = 7;
+}
+
+message TajoHeartbeatResponse {
+  message ResponseCommand {
+      required string command = 1;
+      repeated string params = 2;
+  }
+  required BoolProto heartbeatResult = 1;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message WorkerResourceAllocationRequest {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    required int32 numWorks = 2;
+    required int32 memoryMBSlots = 3 ;
+    required int32 diskSlots = 4;
+}
+
+message WorkerResourceProto {
+    required string workerHostAndPort = 1;
+    required ExecutionBlockIdProto executionBlockId = 2;
+    required int32 memoryMBSlots = 3 ;
+    required int32 diskSlots = 4;
+}
+
+message WorkerResourceReleaseRequest {
+    repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceAllocationResponse {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated string allocatedWorks = 2;
+}
+
+service TajoMasterProtocolService {
+  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+  rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
new file mode 100644
index 0000000..88a2029
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TajoMaster -> TajoWorker, TajoWorker(QueryMaster) <-> TajoWorker Protocol
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoWorkerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message TaskStatusProto {
+  required QueryUnitAttemptIdProto id = 1;
+  required string workerName = 2;
+  required float progress = 3;
+  required TaskAttemptState state = 4;
+  optional StatSetProto stats = 5;
+  optional TableStatProto resultStats = 6;
+  repeated Partition partitions = 7;
+}
+
+message TaskCompletionReport {
+  required QueryUnitAttemptIdProto id = 1;
+  optional StatSetProto stats = 2;
+  optional TableStatProto resultStats = 3;
+  repeated Partition partitions = 4;
+}
+
+message TaskFatalErrorReport {
+  required QueryUnitAttemptIdProto id = 1;
+  optional string error_message = 2;
+}
+
+message QueryUnitRequestProto {
+    required QueryUnitAttemptIdProto id = 1;
+    repeated FragmentProto fragments = 2;
+    required string outputTable = 3;
+    required bool clusteredOutput = 4;
+    required string serializedData = 5;
+    optional bool interQuery = 6 [default = false];
+    repeated Fetch fetches = 7;
+    optional bool shouldDie = 8;
+}
+
+message Fetch {
+    required string name = 1;
+    required string urls = 2;
+}
+
+message QueryUnitResponseProto {
+    required string id = 1;
+    required QueryState status = 2;
+}
+
+message StatusReportProto {
+  required int64 timestamp = 1;
+  required string serverName = 2;
+  repeated TaskStatusProto status = 3;
+  repeated QueryUnitAttemptIdProto pings = 4;
+}
+
+message CommandRequestProto {
+    repeated Command command = 1;
+}
+
+message CommandResponseProto {
+}
+
+message Command {
+    required QueryUnitAttemptIdProto id = 1;
+    required CommandType type = 2;
+}
+
+enum CommandType {
+    PREPARE = 0;
+    LAUNCH = 1;
+    STOP = 2;
+    FINALIZE = 3;
+}
+
+message Partition {
+    required int32 partitionKey = 1;
+    optional string fileName = 2;
+}
+
+message QueryExecutionRequestProto {
+    required QueryIdProto queryId = 1;
+    required StringProto logicalPlanJson = 2;
+}
+
+message GetTaskRequestProto {
+    required ContainerIdProto containerId = 1;
+    required ExecutionBlockIdProto executionBlockId = 2;
+}
+
+message RunExecutionBlockRequestProto {
+    required string executionBlockId = 1;
+    required string queryMasterHost = 2;
+    required int32 queryMasterPort = 3;
+    required string nodeId = 4;
+    required string containerId = 5;
+    optional string queryOutputPath = 6;
+}
+
+service TajoWorkerProtocolService {
+  //from Worker
+  rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+  rpc done (TaskCompletionReport) returns (BoolProto);
+
+  //from TajoMaster's QueryJobManager
+  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+
+  //from QueryMaster(Worker)
+  rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/log4j.properties b/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
index 2b42975..007c8f5 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
@@ -23,3 +23,6 @@ log4j.threshhold=INFO
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index a1a111c..e8ad503 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -43,4 +43,72 @@
   </property>
 
 
+
+  <property>
+    <name>tajo.master.clientservice.addr</name>
+    <value>127.0.0.1:9004</value>
+  </property>
+
+  <property>
+    <name>tajo.master.manager.addr</name>
+    <value>127.0.0.1:9005</value>
+    <description>rpc port for tajo worker</description>
+  </property>
+
+  <property>
+    <name>tajo.query.session.timeout</name>
+    <value>60000</value>
+    <description>ms</description>
+  </property>
+
+  <property>
+    <name>tajo.resource.manager</name>
+    <value>org.apache.tajo.master.rm.TajoWorkerResourceManager</value>
+    <description>This can be org.apache.tajo.master.rm.TajoWorkerResourceManager or org.apache.tajo.master.rm.YarnTajoResourceManager</description>
+  </property>
+
+  <property>
+    <name>tajo.querymaster.memoryMB</name>
+    <value>512</value>
+    <description>the memory slot size for a QeuryMaster</description>
+  </property>
+
+  <property>
+    <name>tajo.worker.slots.use.os.info</name>
+    <value>true</value>
+    <description>If true, Tajo system obtains the physical resource information from OS.
+                 If false, the physical resource information is obtained from the below configs.</description>
+  </property>
+
+  <!-- Default Node's Physical information -->
+  <!-- The below configs are used if tajo.worker.slots.use.os.info is set to true. -->
+  <property>
+    <name>tajo.worker.slots.os.memory.ratio</name>
+    <value>0.8f</value>
+    <description>The ratio of allocatable memory to the total system memory</description>
+  </property>
+
+  <property>
+    <name>tajo.worker.slots.memoryMB</name>
+    <value>2048</value>
+    <description></description>
+  </property>
+
+  <property>
+    <name>tajo.worker.slots.disk</name>
+    <value>2</value>
+    <description>The number of disks on a worker</description>
+  </property>
+
+  <property>
+    <name>tajo.worker.slots.disk.concurrency</name>
+    <value>4</value>
+    <description>the maximum concurrency number per disk slot</description>
+  </property>
+
+  <property>
+    <name>tajo.worker.slots.cpu.core</name>
+    <value>4</value>
+    <description>The number of CPU cores on a worker</description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/log4j.properties b/tajo-core/tajo-core-backend/src/test/java/log4j.properties
index c1ac487..749124c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/test/java/log4j.properties
@@ -23,3 +23,6 @@ log4j.threshhold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 5e8d11d..1667813 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,7 +23,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
@@ -83,7 +86,11 @@ public class LocalTajoTestingUtility {
   }
 
   public void shutdown() throws IOException {
-    client.close();
-    util.shutdownMiniCluster();
+    if(client != null) {
+      client.close();
+    }
+    if(util != null) {
+      util.shutdownMiniCluster();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 5b7267f..37e2721 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -110,9 +110,6 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
 
     conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
 
-    // Disable virtual memory constraints for containers
-    conf.setBoolean("yarn.nodemanager.vmem-check-enabled", false);
-
     super.init(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 32b1f56..041043d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -27,9 +27,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -37,12 +36,17 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
 
 import java.io.*;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 public class TajoTestingCluster {
@@ -54,8 +58,9 @@ public class TajoTestingCluster {
   private MiniDFSCluster dfsCluster;
 	private MiniCatalogServer catalogServer;
 
-
   private TajoMaster tajoMaster;
+  private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
+  private boolean standbyWorkerMode = false;
 
 	// If non-null, then already a cluster running.
 	private File clusterTestBuildDir = null;
@@ -73,7 +78,10 @@ public class TajoTestingCluster {
 	public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
 
 	public TajoTestingCluster() {
-		this.conf = new TajoConf();
+    this.conf = new TajoConf();
+    this.standbyWorkerMode =
+        this.conf.get("tajo.resource.manager", TajoWorkerResourceManager.class.getCanonicalName())
+            .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
 	}
 
 	public TajoConf getConfiguration() {
@@ -113,7 +121,7 @@ public class TajoTestingCluster {
 		String dirStr = getTestDir(randomStr).toString();
 		File dir = new File(dirStr).getAbsoluteFile();
 		// Have it cleaned up on exit
-		dir.deleteOnExit();
+		//dir.deleteOnExit();
 		return dir;
 	}
 
@@ -155,8 +163,7 @@ public class TajoTestingCluster {
     System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
         this.clusterTestBuildDir.toString());
 
-    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
-    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
     builder.hosts(hosts);
     builder.numDataNodes(servers);
     builder.format(true);
@@ -210,7 +217,7 @@ public class TajoTestingCluster {
     catalogServer = new MiniCatalogServer(conf);
     CatalogServer catServer = catalogServer.getCatalogServer();
     InetSocketAddress sockAddr = catServer.getBindAddress();
-    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
+    c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getIpPortString(sockAddr));
 
     return this.catalogServer;
   }
@@ -232,11 +239,12 @@ public class TajoTestingCluster {
     TajoConf c = getConfiguration();
     c.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, "localhost:0");
     c.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, "localhost:0");
-    c.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, "localhost:0");
+    c.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS, "localhost:0");
 
     c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
     c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
     c.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+
     LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
 
     if (!local) {
@@ -253,11 +261,38 @@ public class TajoTestingCluster {
 
     this.conf.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, c.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS));
     this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, c.getVar(ConfVars.CLIENT_SERVICE_ADDRESS));
+
+    InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
+
+    this.conf.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+        tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+
     this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
 
+    if(standbyWorkerMode) {
+      startTajoWorkers(numSlaves);
+    }
     LOG.info("Mini Tajo cluster is up");
   }
 
+  private void startTajoWorkers(int numSlaves) throws Exception {
+    for(int i = 0; i < 1; i++) {
+      TajoWorker tajoWorker = new TajoWorker("all");
+
+      TajoConf workerConf  = new TajoConf(this.conf);
+
+      workerConf.setInt("tajo.worker.info.port", 0);
+      workerConf.setInt("tajo.worker.client.rpc.port", 0);
+      workerConf.setInt("tajo.worker.manager.rpc.port", 0);
+      workerConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
+
+      tajoWorker.startWorker(workerConf, new String[]{"standby"});
+
+      LOG.info("=====> MiniTajoCluster Worker #" + (i + 1) + " started.");
+      tajoWorkers.add(tajoWorker);
+    }
+  }
+
   public void restartTajoCluster(int numSlaves) throws Exception {
     tajoMaster.stop();
     tajoMaster.start();
@@ -273,6 +308,10 @@ public class TajoTestingCluster {
     if(this.tajoMaster != null) {
       this.tajoMaster.stop();
     }
+    for(TajoWorker eachWorker: tajoWorkers) {
+      eachWorker.stopWorkerForce();
+    }
+    tajoWorkers.clear();
     this.tajoMaster= null;
   }
 
@@ -297,7 +336,8 @@ public class TajoTestingCluster {
    */
   public void startMiniCluster(final int numSlaves)
       throws Exception {
-    startMiniCluster(numSlaves, null);
+    String localHostName = InetAddress.getLocalHost().getHostName();
+    startMiniCluster(numSlaves, new String[] {localHostName});
   }
 
   public void startMiniCluster(final int numSlaves,
@@ -331,17 +371,19 @@ public class TajoTestingCluster {
     startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
 
+    if(!standbyWorkerMode) {
+      startMiniYarnCluster();
+    }
+
+    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
+  }
+
+  private void startMiniYarnCluster() throws Exception {
     LOG.info("Starting up YARN cluster");
     // Scheduler properties required for YARN to work
     conf.set("yarn.scheduler.capacity.root.queues", "default");
     conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
 
-    // fixed thread OOM
-    conf.setInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 2);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 2);
-    conf.setInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 2);
-    conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 2);
-
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
@@ -368,8 +410,6 @@ public class TajoTestingCluster {
       yarnCluster.getConfig().writeXml(os);
       os.close();
     }
-
-    startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
   }
 
   public void startMiniClusterInLocal(final int numSlaves) throws Exception {
@@ -413,9 +453,12 @@ public class TajoTestingCluster {
     }
 
     if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
-      LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
-      localFS.delete(
-          new Path(clusterTestBuildDir.toString()), true);
+      if(!ShutdownHookManager.get().isShutdownInProgress()) {
+        //TODO clean test dir when ShutdownInProgress
+        LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
+        localFS.delete(
+            new Path(clusterTestBuildDir.toString()), true);
+      }
       this.clusterTestBuildDir = null;
     }
 
@@ -457,6 +500,12 @@ public class TajoTestingCluster {
                               String query) throws Exception {
     TpchTestBase instance = TpchTestBase.getInstance();
     TajoTestingCluster util = instance.getTestingCluster();
+    while(true) {
+      if(util.getMaster().isMasterRunning()) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
     TajoConf conf = util.getConfiguration();
     TajoClient client = new TajoClient(conf);
 
@@ -503,33 +552,4 @@ public class TajoTestingCluster {
       Closeables.closeQuietly(writer);
     }
   }
-
-
-	/**
-	 * @param args
-	 * @throws Exception
-	 */
-	public static void main(String[] args) throws Exception {
-		TajoTestingCluster cluster = new TajoTestingCluster();
-    File f = cluster.setupClusterTestBuildDir();
-    System.out.println("first setupClusterTestBuildDir: " + f);
-    f = cluster.setupClusterTestBuildDir();
-    System.out.println("second setupClusterTestBuildDir: " + f);
-    f = cluster.getTestDir();
-    System.out.println("getTestDir() after second: " + f);
-    f = cluster.getTestDir("abc");
-    System.out.println("getTestDir(\"abc\") after second: " + f);
-
-    cluster.initTestDir();
-    f = cluster.getTestDir();
-    System.out.println("getTestDir() after initTestDir: " + f);
-    f = cluster.getTestDir("abc");
-    System.out.println("getTestDir(\"abc\") after initTestDir: " + f);
-    f = cluster.setupClusterTestBuildDir();
-    System.out.println("setupClusterTestBuildDir() after initTestDir: " + f);
-
-    TajoTestingCluster cluster2 = new TajoTestingCluster();
-    File f2 = cluster2.setupClusterTestBuildDir();
-    System.out.println("first setupClusterTestBuildDir of cluster2: " + f2);
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
index b4d920f..7b82952 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -27,7 +27,6 @@ public class TestQueryIdFactory {
   
   @Before
   public void setup() {
-    QueryIdFactory.reset();
   }
 
   @Test
@@ -40,15 +39,15 @@ public class TestQueryIdFactory {
   @Test
   public void testNewSubQueryId() {
     QueryId qid = QueryIdFactory.newQueryId();
-    SubQueryId subqid1 = QueryIdFactory.newSubQueryId(qid);
-    SubQueryId subqid2 = QueryIdFactory.newSubQueryId(qid);
+    ExecutionBlockId subqid1 = QueryIdFactory.newExecutionBlockId(qid);
+    ExecutionBlockId subqid2 = QueryIdFactory.newExecutionBlockId(qid);
     assertTrue(subqid1.compareTo(subqid2) < 0);
   }
   
   @Test
   public void testNewQueryUnitId() {
     QueryId qid = QueryIdFactory.newQueryId();
-    SubQueryId subid = QueryIdFactory.newSubQueryId(qid);
+    ExecutionBlockId subid = QueryIdFactory.newExecutionBlockId(qid);
     QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
     QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
     assertTrue(quid1.compareTo(quid2) < 0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
index 386fe02..1997159 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
@@ -20,8 +20,8 @@ package org.apache.tajo;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.junit.Test;
 import org.apache.tajo.util.TajoIdUtils;
+import org.junit.Test;
 
 import static org.junit.Assert.*;
 
@@ -31,10 +31,10 @@ public class TestTajoIds {
     long ts1 = 1315890136000l;
     long ts2 = 1315890136001l;
 
-    QueryId j1 = createQueryId(ts1, 2, 1);
-    QueryId j2 = createQueryId(ts1, 1, 2);
-    QueryId j3 = createQueryId(ts2, 1, 2);
-    QueryId j4 = createQueryId(ts1, 2, 1);
+    QueryId j1 = createQueryId(ts1, 2);
+    QueryId j2 = createQueryId(ts1, 1);
+    QueryId j3 = createQueryId(ts2, 1);
+    QueryId j4 = createQueryId(ts1, 2);
 
     assertTrue(j1.equals(j4));
     assertFalse(j1.equals(j2));
@@ -48,42 +48,42 @@ public class TestTajoIds {
     assertFalse(j1.hashCode() == j2.hashCode());
     assertFalse(j1.hashCode() == j3.hashCode());
 
-    QueryId j5 = createQueryId(ts1, 231415, 2);
-    assertEquals("q_" + ts1 + "_0002_000001", j1.toString());
-    assertEquals("q_" + ts1 + "_231415_000002", j5.toString());
+    QueryId j5 = createQueryId(ts1, 231415);
+    assertEquals("q_" + ts1 + "_0002", j1.toString());
+    assertEquals("q_" + ts1 + "_231415", j5.toString());
   }
 
   @Test
   public void testQueryIds() {
     long timeId = 1315890136000l;
     
-    QueryId queryId = createQueryId(timeId, 1, 1);
-    assertEquals("q_" + timeId + "_0001_000001", queryId.toString());
+    QueryId queryId = createQueryId(timeId, 1);
+    assertEquals("q_" + timeId + "_0001", queryId.toString());
     
-    SubQueryId subId = TajoIdUtils.newSubQueryId(queryId, 2);
-    assertEquals("sq_" + timeId +"_0001_000001_02", subId.toString());
+    ExecutionBlockId subId = QueryIdFactory.newExecutionBlockId(queryId, 2);
+    assertEquals("eb_" + timeId +"_0001_000002", subId.toString());
     
     QueryUnitId qId = new QueryUnitId(subId, 5);
-    assertEquals("t_" + timeId + "_0001_000001_02_000005", qId.toString());
+    assertEquals("t_" + timeId + "_0001_000002_000005", qId.toString());
 
     QueryUnitAttemptId attemptId = new QueryUnitAttemptId(qId, 4);
-    assertEquals("ta_" + timeId + "_0001_000001_02_000005_04", attemptId.toString());
+    assertEquals("ta_" + timeId + "_0001_000002_000005_04", attemptId.toString());
   }
 
   @Test
   public void testEqualsObject() {
     long timeId = System.currentTimeMillis();
     
-    QueryId queryId1 = createQueryId(timeId, 1, 1);
-    QueryId queryId2 = createQueryId(timeId, 2, 2);
+    QueryId queryId1 = createQueryId(timeId, 1);
+    QueryId queryId2 = createQueryId(timeId, 2);
     assertNotSame(queryId1, queryId2);    
-    QueryId queryId3 = createQueryId(timeId, 1, 1);
+    QueryId queryId3 = createQueryId(timeId, 1);
     assertEquals(queryId1, queryId3);
     
-    SubQueryId sid1 = TajoIdUtils.newSubQueryId(queryId1, 1);
-    SubQueryId sid2 = TajoIdUtils.newSubQueryId(queryId1, 2);    
+    ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
     assertNotSame(sid1, sid2);
-    SubQueryId sid3 = TajoIdUtils.newSubQueryId(queryId1, 1);
+    ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
     assertEquals(sid1, sid3);
     
     QueryUnitId qid1 = new QueryUnitId(sid1, 9);
@@ -97,16 +97,16 @@ public class TestTajoIds {
   public void testCompareTo() {
     long time = System.currentTimeMillis();
     
-    QueryId queryId1 = createQueryId(time, 1, 1);
-    QueryId queryId2 = createQueryId(time, 2, 2);
-    QueryId queryId3 = createQueryId(time, 1, 1);
+    QueryId queryId1 = createQueryId(time, 1);
+    QueryId queryId2 = createQueryId(time, 2);
+    QueryId queryId3 = createQueryId(time, 1);
     assertEquals(-1, queryId1.compareTo(queryId2));
     assertEquals(1, queryId2.compareTo(queryId1));
     assertEquals(0, queryId3.compareTo(queryId1));
-    
-    SubQueryId sid1 = TajoIdUtils.newSubQueryId(queryId1, 1);
-    SubQueryId sid2 = TajoIdUtils.newSubQueryId(queryId1, 2);    
-    SubQueryId sid3 = TajoIdUtils.newSubQueryId(queryId1, 1);
+
+    ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+    ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+    ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
     assertEquals(-1, sid1.compareTo(sid2));
     assertEquals(1, sid2.compareTo(sid1));
     assertEquals(0, sid3.compareTo(sid1));
@@ -121,33 +121,33 @@ public class TestTajoIds {
   
   @Test
   public void testConstructFromString() {
-    QueryIdFactory.reset();
+//    QueryIdFactory.reset();
     QueryId qid1 = QueryIdFactory.newQueryId();
-    QueryId qid2 = TajoIdUtils.createQueryId(qid1.toString());
+    QueryId qid2 = TajoIdUtils.parseQueryId(qid1.toString());
     assertEquals(qid1, qid2);
-    
-    SubQueryId sub1 = QueryIdFactory.newSubQueryId(qid1);
-    SubQueryId sub2 = TajoIdUtils.newSubQueryId(sub1.toString());
+
+    ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+    ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
     assertEquals(sub1, sub2);
     
     QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
-    QueryUnitId u2 = new QueryUnitId(u1.toString());
+    QueryUnitId u2 = new QueryUnitId(u1.getProto());
     assertEquals(u1, u2);
 
     QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
-    QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.toString());
+    QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
     assertEquals(attempt1, attempt2);
   }
 
   @Test
   public void testConstructFromPB() {
-    QueryIdFactory.reset();
+//    QueryIdFactory.reset();
     QueryId qid1 = QueryIdFactory.newQueryId();
     QueryId qid2 = new QueryId(qid1.getProto());
     assertEquals(qid1, qid2);
 
-    SubQueryId sub1 = QueryIdFactory.newSubQueryId(qid1);
-    SubQueryId sub2 = new SubQueryId(sub1.getProto());
+    ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+    ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
     assertEquals(sub1, sub2);
 
     QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
@@ -159,9 +159,9 @@ public class TestTajoIds {
     assertEquals(attempt1, attempt2);
   }
 
-  public static QueryId createQueryId(long timestamp, int id, int attemptId) {
+  public static QueryId createQueryId(long timestamp, int id) {
     ApplicationId appId = BuilderUtils.newApplicationId(timestamp, id);
 
-    return TajoIdUtils.createQueryId(appId, attemptId);
+    return QueryIdFactory.newQueryId(appId.toString());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
index c761103..ad3d676 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -49,7 +49,6 @@ public class TpchTestBase {
     try {
       testBase = new TpchTestBase();
       testBase.setUp();
-      Runtime.getRuntime().addShutdownHook(new ShutdownHook());
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
@@ -107,19 +106,11 @@ public class TpchTestBase {
     return util.getTestingCluster();
   }
 
-  public static class ShutdownHook extends Thread {
-
-    @Override
-    public void run() {
-      try {
-        testBase.tearDown();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
+  public void tearDown() throws IOException {
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
     }
-  }
-
-  private void tearDown() throws IOException {
     util.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index cc75726..ede73c5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -136,7 +136,6 @@ public class TestGlobalQueryPlanner {
       catalog.addTable(desc);
     }
 
-    QueryIdFactory.reset();
     queryId = QueryIdFactory.newQueryId();
     dispatcher.stop();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index 4455763..c665b44 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- * 
- */
 package org.apache.tajo.engine.planner.global;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -128,11 +125,11 @@ public class TestGlobalQueryOptimizer {
       catalog.addTable(desc);
     }
 
-    QueryIdFactory.reset();
+    //QueryIdFactory.reset();
     queryId = QueryIdFactory.newQueryId();
     optimizer = new GlobalOptimizer();
   }
-  
+
   @AfterClass
   public static void terminate() throws IOException {
     util.shutdownCatalogCluster();
@@ -147,7 +144,7 @@ public class TestGlobalQueryOptimizer {
 
     MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
     globalPlan = optimizer.optimize(globalPlan);
-    
+
     ExecutionBlock unit = globalPlan.getRoot();
     StoreTableNode store = unit.getStoreTableNode();
     assertEquals(NodeType.PROJECTION, store.getChild().getType());
@@ -156,14 +153,14 @@ public class TestGlobalQueryOptimizer {
     SortNode sort = (SortNode) proj.getChild();
     assertEquals(NodeType.SCAN, sort.getChild().getType());
     ScanNode scan = (ScanNode) sort.getChild();
-    
+
     assertTrue(unit.hasChildBlock());
     unit = unit.getChildBlock(scan);
     store = unit.getStoreTableNode();
     assertEquals(NodeType.SORT, store.getChild().getType());
     sort = (SortNode) store.getChild();
     assertEquals(NodeType.JOIN, sort.getChild().getType());
-    
+
     assertTrue(unit.hasChildBlock());
     for (ScanNode prevscan : unit.getScanNodes()) {
       ExecutionBlock prev = unit.getChildBlock(prevscan);
@@ -171,4 +168,4 @@ public class TestGlobalQueryOptimizer {
       assertEquals(NodeType.SCAN, store.getChild().getType());
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index dbb3862..75e3b1e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,6 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
@@ -78,7 +77,6 @@ public class TestPhysicalPlanner {
 
   @BeforeClass
   public static void setUp() throws Exception {
-    QueryIdFactory.reset();
     util = new TajoTestingCluster();
 
     util.startCatalogCluster();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a8924dd..843df23 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -18,14 +18,13 @@
 
 package org.apache.tajo.master;
 
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.Repartitioner;
 import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.TajoIdUtils;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
 
@@ -37,10 +36,10 @@ import static junit.framework.Assert.assertEquals;
 public class TestRepartitioner {
   @Test
   public void testCreateHashFetchURL() throws Exception {
-    QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2, 1);
+    QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2);
     String hostName = "tajo1";
     int port = 1234;
-    SubQueryId sid = TajoIdUtils.createSubQueryId(q1, 2);
+    ExecutionBlockId sid = new ExecutionBlockId(q1, 2);
     int partitionId = 2;
 
     List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
index 05a269e..952cb0f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
@@ -25,31 +25,27 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TestTajoIds;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
-import org.apache.tajo.util.TajoIdUtils;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskRunnerTest {
   long ts1 = 1315890136000l;
-  QueryId q1 = TestTajoIds.createQueryId(ts1, 2, 5);
-  SubQueryId sq1 = TajoIdUtils.createSubQueryId(q1, 5);
+  QueryId q1 = TestTajoIds.createQueryId(ts1, 2);
+  ExecutionBlockId sq1 = QueryIdFactory.newExecutionBlockId(q1, 5);
 
   //@Test
   public void testInit() throws Exception {
     ProtoAsyncRpcClient mockClient = mock(ProtoAsyncRpcClient.class);
     mockClient.close();
 
-    QueryMasterProtocolService.Interface mockMaster =
-        mock(QueryMasterProtocolService.Interface.class);
+    TajoWorkerProtocolService.Interface mockMaster =
+        mock(TajoWorkerProtocolService.Interface.class);
     ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
-        q1.getApplicationId(), q1.getAttemptId());
+        BuilderUtils.newApplicationId(Integer.parseInt(q1.getId()), q1.getSeq()), 1);
     ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
 
     NodeId nodeId = RecordFactoryProvider.getRecordFactory(null).

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 09ab483..cf1e9ae 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.algebra.Expr;
@@ -73,7 +72,6 @@ public class TestRangeRetrieverHandler {
 
   @Before
   public void setUp() throws Exception {
-    QueryIdFactory.reset();
     util = new TajoTestingCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir("target/test-data/TestRangeRetrieverHandler");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
index 6e6fdae..b70dda2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
@@ -19,15 +19,15 @@
 package org.apache.tajo.worker.dataserver;
 
 import org.apache.hadoop.net.NetUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.InterDataRetriever;
 import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
 import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.*;
 import java.net.InetSocketAddress;
@@ -77,8 +77,7 @@ public class TestHttpDataServer {
   
   @Test
   public final void testInterDataRetriver() throws Exception {
-    QueryIdFactory.reset();
-    SubQueryId schid = QueryIdFactory.newSubQueryId(
+    ExecutionBlockId schid = QueryIdFactory.newExecutionBlockId(
             QueryIdFactory.newQueryId());
     QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
     QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
@@ -119,8 +118,7 @@ public class TestHttpDataServer {
   
   @Test(expected = FileNotFoundException.class)
   public final void testNoSuchFile() throws Exception {
-    QueryIdFactory.reset();
-    SubQueryId schid = QueryIdFactory.newSubQueryId(
+    ExecutionBlockId schid = QueryIdFactory.newExecutionBlockId(
             QueryIdFactory.newQueryId());
     QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
     QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);