You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/10/25 07:22:41 UTC

[2/2] git commit: TAJO-275: Separating QueryMaster and TaskRunner roles in worker. (Keuntae Park via jihoon)

TAJO-275: Separating QueryMaster and TaskRunner roles in worker. (Keuntae Park via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5470bcea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5470bcea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5470bcea

Branch: refs/heads/master
Commit: 5470bcea1ae3ec4c627f7f6fc7efa2c1c19e06cd
Parents: fdf0979
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Oct 25 14:21:18 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Oct 25 14:21:18 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 pom.xml                                         |   1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +
 tajo-core/tajo-core-backend/pom.xml             |   1 +
 .../org/apache/tajo/master/GlobalEngine.java    |  19 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   6 +-
 .../apache/tajo/master/TajoMasterService.java   |  10 +-
 .../master/querymaster/QueryInProgress.java     |  20 +-
 .../tajo/master/querymaster/QueryInfo.java      |   2 +-
 .../master/querymaster/QueryJobManager.java     |   7 +-
 .../tajo/master/querymaster/QueryMaster.java    |  14 +-
 .../querymaster/QueryMasterManagerService.java  | 208 +++++++++++++++++
 .../master/rm/TajoWorkerResourceManager.java    | 115 +++++----
 .../apache/tajo/master/rm/WorkerResource.java   |  66 ++++--
 .../tajo/master/rm/WorkerResourceManager.java   |   3 +
 .../tajo/master/rm/YarnTajoResourceManager.java |   4 +
 .../tajo/worker/TajoResourceAllocator.java      |   6 +-
 .../java/org/apache/tajo/worker/TajoWorker.java | 233 ++++++++++++++-----
 .../tajo/worker/TajoWorkerManagerService.java   |  96 --------
 .../main/java/org/apache/tajo/worker/Task.java  |  14 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  12 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |   4 +-
 .../src/main/proto/QueryMasterProtocol.proto    |  41 ++++
 .../src/main/proto/TajoMasterProtocol.proto     |  39 ++--
 .../src/main/proto/TajoWorkerProtocol.proto     |   8 -
 .../src/main/resources/tajo-default.xml         |  13 ++
 .../main/resources/webapps/admin/cluster.jsp    | 112 +++++++--
 .../src/main/resources/webapps/admin/index.jsp  |  87 +++++--
 .../src/main/resources/webapps/admin/query.jsp  |  29 ++-
 .../src/main/resources/webapps/worker/env.jsp   |   2 +-
 .../src/main/resources/webapps/worker/index.jsp |  71 +++---
 .../resources/webapps/worker/querydetail.jsp    |   2 +-
 .../main/resources/webapps/worker/queryplan.jsp |   2 +-
 .../resources/webapps/worker/querytasks.jsp     |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   4 +-
 tajo-dist/src/main/bin/start-tajo.sh            |   3 +-
 tajo-dist/src/main/bin/stop-tajo.sh             |   3 +-
 tajo-dist/src/main/bin/tajo                     |  19 +-
 tajo-dist/src/main/bin/tajo-config.sh           |   4 +-
 tajo-dist/src/main/conf/tajo-env.sh             |   6 +
 40 files changed, 905 insertions(+), 387 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5b6a248..7defd7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -59,6 +59,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-275: Separating QueryMaster and TaskRunner roles in worker. (Keuntae Park via jihoon)
+
     TAJO-270: Boolean datum compatible to apache hive. (jinho)
 
     TAJO-261: Rearrange default port numbers and config names. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1815958..7038e40 100644
--- a/pom.xml
+++ b/pom.xml
@@ -306,6 +306,7 @@
           <excludes>
             <exclude>CHANGES.txt</exclude>
             <exclude>**/workers</exclude>
+            <exclude>**/querymasters</exclude>
             <exclude>**/*.sql</exclude>
             <exclude>**/*.hiveql</exclude>
             <exclude>**/*.schema</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 475877b..360d7f0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -83,8 +83,10 @@ public class TajoConf extends YarnConfiguration {
 
     // Tajo Worker Service Addresses
     WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
+    WORKER_QM_INFO_ADDRESS("tajo.worker.qm-info-http.address", "0.0.0.0:28081"),
     WORKER_PEER_RPC_ADDRESS("tajo.worker.peer-rpc.address", "0.0.0.0:28091"),
     WORKER_CLIENT_RPC_ADDRESS("tajo.worker.client-rpc.address", "0.0.0.0:28092"),
+    WORKER_QM_RPC_ADDRESS("tajo.worker.qm-rpc.address", "0.0.0.0:28093"),
 
     // Tajo Worker Temporal Directories
     WORKER_TEMPORAL_DIR("tajo.worker.tmpdir.locations", "/tmp/tajo-${user.name}/tmpdir"),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index f888d1e..f674f84 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -129,6 +129,7 @@
                 <argument>--java_out=target/generated-sources/proto</argument>
                 <argument>src/main/proto/tajo_protos.proto</argument>
                 <argument>src/main/proto/ClientProtos.proto</argument>
+                <argument>src/main/proto/QueryMasterProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterClientProtocol.proto</argument>
                 <argument>src/main/proto/TajoMasterProtocol.proto</argument>
                 <argument>src/main/proto/TajoWorkerProtocol.proto</argument>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index c3eb5f7..a92b1a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -149,13 +149,20 @@ public class GlobalEngine extends AbstractService {
 
         queryInfo = queryJobManager.createNewQueryJob(queryContext, sql, rootNode);
 
-        responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
-        responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
-        responseBuilder.setState(queryInfo.getQueryState());
-        if(queryInfo.getQueryMasterHost() != null) {
-          responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+        if(queryInfo == null) {
+          responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+          responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+          responseBuilder.setState(TajoProtos.QueryState.QUERY_ERROR);
+          responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+        } else {
+          responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+          responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+          responseBuilder.setState(queryInfo.getQueryState());
+          if(queryInfo.getQueryMasterHost() != null) {
+            responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+          }
+          responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
         }
-        responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
       }
       GetQueryStatusResponse response = responseBuilder.build();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 846343d..be9d047 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -65,7 +65,7 @@ public class TajoContainerProxy extends ContainerProxy {
     AsyncRpcClient tajoWorkerRpc = null;
     try {
       InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
-          .getTajoWorkerManagerService().getBindAddr();
+          .getQueryMasterManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
       tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
@@ -151,7 +151,9 @@ public class TajoContainerProxy extends ContainerProxy {
 
     for(WorkerResource eahWorkerResource: workerResources) {
       workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
-          .setWorkerHostAndPort(eahWorkerResource.getId())
+          .setHost(eahWorkerResource.getAllocatedHost())
+          .setQueryMasterPort(eahWorkerResource.getQueryMasterPort())
+          .setPeerRpcPort(eahWorkerResource.getPeerRpcPort())
           .setExecutionBlockId(executionBlockId.getProto())
           .setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
           .setDiskSlots(eahWorkerResource.getDiskSlots())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 04b562c..cf193de 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -93,7 +93,8 @@ public class TajoMasterService extends AbstractService {
         RpcController controller,
         TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort());
+        LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" +
+            request.getTajoQueryMasterPort() + ":" + request.getPeerRpcPort());
       }
 
       TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
@@ -137,9 +138,10 @@ public class TajoMasterService extends AbstractService {
       List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
       for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
         WorkerResource workerResource = new WorkerResource();
-        String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
-        workerResource.setAllocatedHost(tokens[0]);
-        workerResource.setPeerRpcPort(Integer.parseInt(tokens[1]));
+        workerResource.setAllocatedHost(eachWorkerResource.getHost());
+
+        workerResource.setPeerRpcPort(eachWorkerResource.getPeerRpcPort());
+        workerResource.setQueryMasterPort(eachWorkerResource.getQueryMasterPort());
         workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
         workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 0098a7c..0eb9120 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -62,7 +64,7 @@ public class QueryInProgress extends CompositeService {
 
   private AsyncRpcClient queryMasterRpc;
 
-  private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
+  private QueryMasterProtocolService queryMasterRpcClient;
 
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
@@ -139,18 +141,22 @@ public class QueryInProgress extends CompositeService {
     return dispatcher.getEventHandler();
   }
 
-  public void startQueryMaster() {
+  public boolean startQueryMaster() {
     try {
       LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
       WorkerResourceManager resourceManager = masterContext.getResourceManager();
       WorkerResource queryMasterResource = resourceManager.allocateQueryMaster(this);
 
-      if(queryMasterResource != null) {
-        queryInfo.setQueryMasterResource(queryMasterResource);
+      if(queryMasterResource == null) {
+        return false;
       }
+      queryInfo.setQueryMasterResource(queryMasterResource);
       getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+      return true;
     } catch (Exception e) {
       catchException(e);
+      return false;
     }
   }
 
@@ -169,7 +175,7 @@ public class QueryInProgress extends CompositeService {
     }
   }
 
-  public TajoWorkerProtocol.TajoWorkerProtocolService getQueryMasterRpcClient() {
+  public QueryMasterProtocolService getQueryMasterRpcClient() {
     return queryMasterRpcClient;
   }
 
@@ -180,7 +186,7 @@ public class QueryInProgress extends CompositeService {
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
       LOG.info("Connect to QueryMaster:" + addr);
       //TODO Get Connection from pool
-      queryMasterRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
+      queryMasterRpc = new AsyncRpcClient(QueryMasterProtocol.class, addr);
       queryMasterRpcClient = queryMasterRpc.getStub();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index f89a017..058352f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,7 +66,7 @@ public class QueryInfo {
     if(queryMasterResource == null) {
       return 0;
     }
-    return queryMasterResource.getPeerRpcPort();
+    return queryMasterResource.getQueryMasterPort();
   }
 
   public int getQueryMasterClientPort() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index f5f029f..41df7c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -107,7 +107,9 @@ public class QueryJobManager extends CompositeService {
     queryInProgress.init(getConfig());
     queryInProgress.start();
 
-    queryInProgress.startQueryMaster();
+    if(!queryInProgress.startQueryMaster()) {
+      return null;
+    }
 
     return queryInProgress.getQueryInfo();
   }
@@ -171,7 +173,8 @@ public class QueryJobManager extends CompositeService {
     if(queryHeartbeat.getTajoWorkerHost() != null) {
       WorkerResource queryMasterResource = new WorkerResource();
       queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
-      queryMasterResource.setPeerRpcPort(queryHeartbeat.getTajoWorkerPort());
+      queryMasterResource.setPeerRpcPort(queryHeartbeat.getPeerRpcPort());
+      queryMasterResource.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
       queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
       queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
       queryInfo.setQueryMasterResource(queryMasterResource);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 2860b17..1198f3e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -153,7 +153,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     super.stop();
 
     LOG.info("QueryMaster stop");
-    if(!queryMasterContext.getWorkerContext().isStandbyMode()) {
+    if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
       queryMasterContext.getWorkerContext().stopWorker(true);
     }
   }
@@ -162,8 +162,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
     LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
     try {
       TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
-          .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+          .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+          .setPeerRpcPort(workerContext.getPeerRpcPort())
+          .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
           .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
           .setState(state)
           .setQueryId(queryId.getProto());
@@ -280,7 +281,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       } else {
         LOG.warn("No query info:" + queryId);
       }
-      if(!workerContext.isStandbyMode()) {
+      if(workerContext.isYarnContainerMode()) {
         stop();
       }
     }
@@ -288,8 +289,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
     TajoHeartbeat queryHeartbeat = TajoHeartbeat.newBuilder()
-        .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
-        .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+        .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
+        .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
+        .setPeerRpcPort(workerContext.getPeerRpcPort())
         .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
         .setState(queryMasterTask.getState())
         .setQueryId(queryMasterTask.getQueryId().getProto())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
new file mode 100644
index 0000000..4149749
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.net.InetSocketAddress;
+
+public class QueryMasterManagerService extends CompositeService
+    implements QueryMasterProtocol.QueryMasterProtocolService.Interface {
+  private static final Log LOG = LogFactory.getLog(QueryMasterManagerService.class.getName());
+
+  private AsyncRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+
+  private QueryMaster queryMaster;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  public QueryMasterManagerService(TajoWorker.WorkerContext workerContext, int port) {
+    super(QueryMasterManagerService.class.getName());
+    this.workerContext = workerContext;
+    this.port = port;
+  }
+
+  public QueryMaster getQueryMaster() {
+    return queryMaster;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      // Setup RPC server
+      InetSocketAddress initIsa =
+          new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+
+      queryMaster = new QueryMaster(workerContext);
+      addService(queryMaster);
+
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info("QueryMasterManagerService is bind to " + addr);
+    ((TajoConf)conf).setVar(TajoConf.ConfVars.WORKER_QM_RPC_ADDRESS, addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("QueryMasterManagerService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public String getHostAndPort() {
+    return bindAddr.getHostName() + ":" + bindAddr.getPort();
+  }
+
+  @Override
+  public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
+                      RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+    try {
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+      ContainerId cid =
+          queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
+
+      if(queryMasterTask == null || queryMasterTask.isStopped()) {
+        LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+        done.run(TaskSchedulerImpl.stopTaskRunnerReq);
+      } else {
+        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
+        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(
+          new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void ping(RpcController controller,
+                   TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
+                         RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void executeQuery(RpcController controller,
+                           TajoWorkerProtocol.QueryExecutionRequestProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryId queryId = new QueryId(request.getQueryId());
+      LOG.info("Receive executeQuery request:" + queryId);
+      queryMaster.handle(new QueryStartEvent(queryId,
+          new QueryContext(request.getQueryContext()), request.getSql().getValue(),
+          request.getLogicalPlanJson().getValue()));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index c992e43..136f6ea 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.worker.TajoWorker;
 
 import java.io.IOException;
 import java.util.*;
@@ -42,10 +42,18 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private TajoMaster.MasterContext masterContext;
 
+  //all workers(include querymaster)
   private Map<String, WorkerResource> allWorkerResourceMap = new HashMap<String, WorkerResource>();
-  private Set<String> liveWorkerResources = new HashSet<String>();
+
+  //all workers(include querymaster)
   private Set<String> deadWorkerResources = new HashSet<String>();
 
+  //worker only
+  private Set<String> liveWorkerResources = new HashSet<String>();
+
+  //querymaster only
+  private Set<String> liveQueryMasterWorkerResources = new HashSet<String>();
+
   private Map<QueryId, WorkerResource> queryMasterMap = new HashMap<QueryId, WorkerResource>();
 
   private final Object workerResourceLock = new Object();
@@ -60,15 +68,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
-  private int queryMasterMemoryMB;
-
-  private int queryMasterDiskSlot;
-
   public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
     this.masterContext = masterContext;
     this.queryIdSeed = String.valueOf(System.currentTimeMillis());
-    this.queryMasterMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_MEMORY_MB);
-    this.queryMasterDiskSlot = masterContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_QUERY_MASTER_DISKS);
 
     requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
     reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -81,6 +83,10 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     return Collections.unmodifiableMap(allWorkerResourceMap);
   }
 
+  public Collection<String> getQueryMasters() {
+    return Collections.unmodifiableSet(liveQueryMasterWorkerResources);
+  }
+
   public int getNumClusterSlots() {
     int numSlots = 0;
     synchronized(workerResourceLock) {
@@ -105,18 +111,28 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   @Override
   public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
-    List<WorkerResource> workerResources = chooseWorkers(true, queryMasterMemoryMB, queryMasterDiskSlot, 1);
-    if(workerResources.size() == 0) {
-      //TODO if resource available, assign worker.
-      LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
-      return null;
-    }
-    WorkerResource queryMasterWorker = workerResources.get(0);
     synchronized(workerResourceLock) {
+      if(liveQueryMasterWorkerResources.size() == 0) {
+        LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
+        return null;
+      }
+      WorkerResource queryMasterWorker = null;
+      int minTasks = Integer.MAX_VALUE;
+      for(String eachQueryMaster: liveQueryMasterWorkerResources) {
+        WorkerResource queryMaster = allWorkerResourceMap.get(eachQueryMaster);
+        if(queryMaster != null && queryMaster.getNumQueryMasterTasks() < minTasks) {
+          queryMasterWorker = queryMaster;
+          minTasks = queryMaster.getNumQueryMasterTasks();
+        }
+      }
+      if(queryMasterWorker == null) {
+        return null;
+      }
+      queryMasterWorker.addNumQueryMasterTask();
       queryMasterMap.put(queryInProgress.getQueryId(), queryMasterWorker);
+      LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
+      return queryMasterWorker;
     }
-    LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
-    return queryMasterWorker;
   }
 
   @Override
@@ -132,7 +148,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
       //add queue
       TajoMasterProtocol.WorkerResourceAllocationRequest request =
           TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
-            .setMemoryMBSlots(queryMasterMemoryMB)
+            .setMemoryMBSlots(1)
             .setDiskSlots(1)
             .setExecutionBlockId(QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0).getProto())
             .setNumWorks(1)
@@ -207,7 +223,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
                 ", liveWorkers=" + liveWorkerResources.size());
           }
 
-          List<WorkerResource> workerResources = chooseWorkers(false,
+          List<WorkerResource> workerResources = chooseWorkers(
               resourceRequest.request.getMemoryMBSlots(),
               resourceRequest.request.getDiskSlots(),
               resourceRequest.request.getNumWorks());
@@ -223,7 +239,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
               for(WorkerResource eachWorker: workerResources) {
                 workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
-                    .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getPeerRpcPort())
+                    .setWorkerHost(eachWorker.getAllocatedHost())
+                    .setQueryMasterPort(eachWorker.getQueryMasterPort())
+                    .setPeerRpcPort(eachWorker.getPeerRpcPort())
                     .setWorkerPullServerPort(eachWorker.getPullServerPort())
                     .build());
               }
@@ -252,8 +270,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
-  private List<WorkerResource> chooseWorkers(boolean queryMaster,
-                                             int requiredMemoryMBSlots, int requiredDiskSlots,
+  private List<WorkerResource> chooseWorkers(int requiredMemoryMBSlots, int requiredDiskSlots,
                                              int numWorkerSlots) {
     List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
 
@@ -275,15 +292,8 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
           } else {
             WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
             if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
-                //TODO check disk slot
-                // && workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
-              if(queryMaster && workerResource.isQueryMasterAllocated()) {
-                insufficientWorkers.add(eachWorker);
-                continue;
-              }
               workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
               //workerResource.addUsedDiskSlots(requiredDiskSlots);
-              workerResource.setQueryMasterAllocated(queryMaster);
               selectedWorkers.add(workerResource);
               selectedCount++;
             } else {
@@ -336,28 +346,33 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         return;
       } else {
         queryMasterWorkerResource = queryMasterMap.remove(queryId);
+        queryMasterWorkerResource.releaseQueryMasterTask();
       }
     }
 
-    WorkerResource workerResource = new WorkerResource();
-    workerResource.copyId(queryMasterWorkerResource);
-    workerResource.setMemoryMBSlots(queryMasterMemoryMB);
-    workerResource.setDiskSlots(queryMasterDiskSlot);
-    workerResource.setCpuCoreSlots(0);
-    workerResource.setQueryMasterAllocated(queryMasterWorkerResource.isQueryMasterAllocated());
-    releaseWorkerResource(queryId, workerResource);
     LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
   }
 
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
     synchronized(workerResourceLock) {
-      String hostAndPort = request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort();
-      if(allWorkerResourceMap.containsKey(hostAndPort)) {
-        if(deadWorkerResources.contains(hostAndPort)) {
-          deadWorkerResources.remove(hostAndPort);
-          liveWorkerResources.add(hostAndPort);
+      String workerKey = request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort() + ":"
+          + request.getPeerRpcPort();
+      boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
+      boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
+
+      if(allWorkerResourceMap.containsKey(workerKey)) {
+        WorkerResource workerResource = allWorkerResourceMap.get(workerKey);
+
+        if(deadWorkerResources.contains(workerKey)) {
+          deadWorkerResources.remove(workerKey);
+          if(queryMasterMode) {
+            liveQueryMasterWorkerResources.add(workerKey);
+            workerResource.setNumRunningTasks(0);
+          }
+          if(taskRunnerMode) {
+            liveWorkerResources.add(workerKey);
+          }
         }
-        WorkerResource workerResource = allWorkerResourceMap.get(hostAndPort);
         workerResource.setLastHeartbeat(System.currentTimeMillis());
         workerResource.setWorkerStatus(WorkerStatus.LIVE);
         workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
@@ -365,12 +380,14 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
         workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
       } else {
+        //initial connection
         WorkerResource workerResource = new WorkerResource();
         workerResource.setAllocatedHost(request.getTajoWorkerHost());
+        workerResource.setQueryMasterMode(queryMasterMode);
+        workerResource.setTaskRunnerMode(taskRunnerMode);
 
-        int[] ports = new int[] { request.getTajoWorkerPort(), request.getTajoWorkerClientPort() };
-
-        workerResource.setPeerRpcPort(request.getTajoWorkerPort());
+        workerResource.setQueryMasterPort(request.getTajoQueryMasterPort());
+        workerResource.setPeerRpcPort(request.getPeerRpcPort());
         workerResource.setClientPort(request.getTajoWorkerClientPort());
         workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
         workerResource.setHttpPort(request.getTajoWorkerHttpPort());
@@ -392,7 +409,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         }
 
         allWorkerResourceMap.put(workerResource.getId(), workerResource);
-        liveWorkerResources.add(hostAndPort);
+        if(queryMasterMode) {
+          liveQueryMasterWorkerResources.add(workerKey);
+        }
+
+        if(taskRunnerMode) {
+          liveWorkerResources.add(workerKey);
+        }
 
         LOG.info("TajoWorker:" + workerResource + " added in live TajoWorker list");
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index fad129a..bd7db6d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -20,7 +20,9 @@ package org.apache.tajo.master.rm;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.worker.TajoWorker;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -29,6 +31,7 @@ public class WorkerResource {
 
   private String allocatedHost;
   private int peerRpcPort;
+  private int queryMasterPort;
   private int clientPort;
   private int pullServerPort;
   private int httpPort;
@@ -47,8 +50,6 @@ public class WorkerResource {
 
   private int numRunningTasks;
 
-  private boolean queryMasterAllocated;
-
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Lock rlock = lock.readLock();
   private final Lock wlock = lock.writeLock();
@@ -57,13 +58,14 @@ public class WorkerResource {
 
   private long lastHeartbeat;
 
-  public String getId() {
-    return allocatedHost + ":" + peerRpcPort;
-  }
+  private boolean queryMasterMode;
 
-  public void copyId(WorkerResource workerResource) {
-    peerRpcPort = workerResource.getPeerRpcPort();
-    allocatedHost = workerResource.getAllocatedHost();
+  private boolean taskRunnerMode;
+
+  private AtomicInteger numQueryMasterTasks = new AtomicInteger(0);
+
+  public String getId() {
+    return allocatedHost + ":" + queryMasterPort + ":" + peerRpcPort;
   }
 
   public String getAllocatedHost() {
@@ -140,7 +142,7 @@ public class WorkerResource {
   }
 
   public String portsToStr() {
-    return peerRpcPort + "," + clientPort + "," + pullServerPort;
+    return queryMasterPort + "," + peerRpcPort + "," + clientPort + "," + pullServerPort;
   }
 
   public void setLastHeartbeat(long heartbeatTime) {
@@ -194,19 +196,23 @@ public class WorkerResource {
     return lastHeartbeat;
   }
 
-  public boolean isQueryMasterAllocated() {
-    return queryMasterAllocated;
+  public boolean isQueryMasterMode() {
+    return queryMasterMode;
   }
 
-  public void setQueryMasterAllocated(boolean queryMasterAllocated) {
-    this.queryMasterAllocated = queryMasterAllocated;
+  public void setQueryMasterMode(boolean queryMasterMode) {
+    this.queryMasterMode = queryMasterMode;
   }
 
-  public void releaseResource(WorkerResource workerResource) {
-    if(workerResource.isQueryMasterAllocated()) {
-        queryMasterAllocated = false;
-    }
+  public boolean isTaskRunnerMode() {
+    return taskRunnerMode;
+  }
 
+  public void setTaskRunnerMode(boolean taskRunnerMode) {
+    this.taskRunnerMode = taskRunnerMode;
+  }
+
+  public void releaseResource(WorkerResource workerResource) {
     try {
       wlock.lock();
       usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
@@ -222,7 +228,7 @@ public class WorkerResource {
 
   public int getSlots() {
     //TODO what is slot? 512MB = 1slot?
-    return getMemoryMBSlots() / 512;
+    return getMemoryMBSlots()/512;
   }
 
   public int getAvaliableSlots() {
@@ -243,6 +249,14 @@ public class WorkerResource {
     this.peerRpcPort = peerRpcPort;
   }
 
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public void setQueryMasterPort(int queryMasterPort) {
+    this.queryMasterPort = queryMasterPort;
+  }
+  
   public int getClientPort() {
     return clientPort;
   }
@@ -298,4 +312,20 @@ public class WorkerResource {
   public void setNumRunningTasks(int numRunningTasks) {
     this.numRunningTasks = numRunningTasks;
   }
+
+  public int getNumQueryMasterTasks() {
+    return numQueryMasterTasks.get();
+  }
+
+  public void setNumQueryMasterTasks(int numQueryMasterTasks) {
+    this.numQueryMasterTasks.set(numQueryMasterTasks);
+  }
+
+  public void addNumQueryMasterTask() {
+    numQueryMasterTasks.getAndIncrement();
+  }
+
+  public void releaseQueryMasterTask() {
+    numQueryMasterTasks.getAndDecrement();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 93772ec..2e66f98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 public interface WorkerResourceManager {
@@ -62,4 +63,6 @@ public interface WorkerResourceManager {
   public void stop();
 
   public int getNumClusterSlots();
+
+  Collection<String> getQueryMasters();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 0baa55c..802e5ed 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -79,6 +79,10 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
     return new HashMap<String, WorkerResource>();
   }
 
+  public Collection<String> getQueryMasters() {
+    return new ArrayList<String>();
+  }
+
   public int getNumClusterSlots() {
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index e42de27..46a5807 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -261,10 +261,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeIdPBImpl nodeId = new NodeIdPBImpl();
-          String[] tokens = eachWorker.getWorkerHostAndPort().split(":");
 
-          nodeId.setHost(tokens[0]);
-          nodeId.setPort(Integer.parseInt(tokens[1]));
+          nodeId.setHost(eachWorker.getWorkerHost());
+          nodeId.setPort(eachWorker.getPeerRpcPort());
 
           TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
@@ -278,6 +277,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           WorkerResource workerResource = new WorkerResource();
           workerResource.setAllocatedHost(nodeId.getHost());
           workerResource.setPeerRpcPort(nodeId.getPort());
+          workerResource.setQueryMasterPort(eachWorker.getQueryMasterPort());
           workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
           workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
           workerResource.setDiskSlots(requiredDiskSlots);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 6152da7..92e30bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -33,6 +33,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMasterManagerService;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.AsyncRpcClient;
@@ -61,6 +62,12 @@ public class TajoWorker extends CompositeService {
   public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
   public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
 
+  public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
+  public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
+  public static final String WORKER_MODE_STANDBY = "standby";
+  public static final String WORKER_MODE_QUERY_MASTER = "standby-qm";
+  public static final String WORKER_MODE_TASKRUNNER = "standby-tr";
+
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 
   private TajoConf systemConf;
@@ -69,6 +76,8 @@ public class TajoWorker extends CompositeService {
 
   private TajoWorkerClientService tajoWorkerClientService;
 
+  private QueryMasterManagerService queryMasterManagerService;
+
   private TajoWorkerManagerService tajoWorkerManagerService;
 
   private InetSocketAddress tajoMasterAddress;
@@ -86,7 +95,11 @@ public class TajoWorker extends CompositeService {
 
   private TajoPullServerService pullService;
 
-  private String daemonMode;
+  private boolean yarnContainerMode;
+
+  private boolean queryMasterMode;
+
+  private boolean taskRunnerMode;
 
   private WorkerHeartbeatThread workerHeartbeatThread;
 
@@ -100,11 +113,48 @@ public class TajoWorker extends CompositeService {
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
-  public TajoWorker(String daemonMode) throws Exception {
+  private String[] cmdArgs;
+
+  public TajoWorker() throws Exception {
     super(TajoWorker.class.getName());
-    this.daemonMode = daemonMode;
   }
 
+  public void startWorker(TajoConf systemConf, String[] args) {
+    this.systemConf = systemConf;
+    this.cmdArgs = args;
+    setWorkerMode(args);
+    init(systemConf);
+    start();
+  }
+
+  private void setWorkerMode(String[] args) {
+    if(args.length < 1) {
+      queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true);
+      taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true);
+    } else {
+      if(WORKER_MODE_STANDBY.equals(args[0])) {
+        queryMasterMode = true;
+        taskRunnerMode = true;
+      } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) {
+        yarnContainerMode = true;
+        queryMasterMode = true;
+      } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) {
+        yarnContainerMode = true;
+        taskRunnerMode = true;
+      } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) {
+        yarnContainerMode = false;
+        queryMasterMode = true;
+      } else {
+        yarnContainerMode = false;
+        taskRunnerMode = true;
+      }
+    }
+    if(!queryMasterMode && !taskRunnerMode) {
+      LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property");
+      System.exit(0);
+    }
+  }
+  
   @Override
   public void init(Configuration conf) {
     Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
@@ -122,35 +172,52 @@ public class TajoWorker extends CompositeService {
     }
     int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
     int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
+    int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
 
     if(randomPort) {
       clientPort = 0;
       peerRpcPort = 0;
+      qmManagerPort = 0;
       systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
     }
 
-    if(!"qm".equals(daemonMode)) {
+    if(queryMasterMode) {
+      //querymaster worker
+      tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+      addService(tajoWorkerClientService);
+
+      queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
+      addService(queryMasterManagerService);
+    }
+
+    if(taskRunnerMode) {
+      //taskrunner worker
       taskRunnerManager = new TaskRunnerManager(workerContext);
       addService(taskRunnerManager);
-    }
 
-    if(workerContext.isStandbyMode()) {
-      pullService = new TajoPullServerService();
-      addService(pullService);
+      if(!yarnContainerMode) {
+        tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+        addService(tajoWorkerManagerService);
+      }
     }
 
-    if(!"tr".equals(daemonMode)) {
-      tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
-      addService(tajoWorkerClientService);
+    LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
+        ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
+        ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort);
 
-      tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
-      addService(tajoWorkerManagerService);
-      LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
-          + peerRpcPort);
+    if(!yarnContainerMode) {
+      if(taskRunnerMode) {
+        pullService = new TajoPullServerService();
+        addService(pullService);
+      }
 
       if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
         try {
           httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+          if(queryMasterMode && !taskRunnerMode) {
+            //If QueryMaster and TaskRunner run on single host, http port conflicts
+            httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+          }
           webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
               true, null, systemConf, null);
           webServer.start();
@@ -160,14 +227,27 @@ public class TajoWorker extends CompositeService {
           LOG.error(e.getMessage(), e);
         }
       }
-      LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", peerRpcPort="
-          + peerRpcPort);
+    }
+
+    super.init(conf);
+
+    if(yarnContainerMode && queryMasterMode) {
+      String tajoMasterAddress = cmdArgs[2];
+      connectToTajoMaster(tajoMasterAddress);
+      connectToCatalog();
 
+      QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
+      queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
+          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+    } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
+      taskRunnerManager.startTask(cmdArgs);
     } else {
-      LOG.info("Tajo worker started: mode=" + daemonMode);
+      connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
+      connectToCatalog();
+      workerHeartbeatThread = new WorkerHeartbeatThread();
+      workerHeartbeatThread.start();
     }
 
-    super.init(conf);
   }
 
   public WorkerContext getWorkerContext() {
@@ -216,13 +296,20 @@ public class TajoWorker extends CompositeService {
 
   public class WorkerContext {
     public QueryMaster getQueryMaster() {
-      return tajoWorkerManagerService.getQueryMaster();
+      if(queryMasterManagerService == null) {
+        return null;
+      }
+      return queryMasterManagerService.getQueryMaster();
     }
 
     public TajoWorkerManagerService getTajoWorkerManagerService() {
       return tajoWorkerManagerService;
     }
 
+    public QueryMasterManagerService getQueryMasterManagerService() {
+      return queryMasterManagerService;
+    }
+
     public TajoWorkerClientService getTajoWorkerClientService() {
       return tajoWorkerClientService;
     }
@@ -244,7 +331,11 @@ public class TajoWorker extends CompositeService {
     }
 
     public String getWorkerName() {
-      return getTajoWorkerManagerService().getHostAndPort();
+      if(queryMasterMode) {
+        return getQueryMasterManagerService().getHostAndPort();
+      } else {
+        return getTajoWorkerManagerService().getHostAndPort();
+      }
     }
     public void stopWorker(boolean force) {
       stop();
@@ -253,8 +344,8 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    public boolean isStandbyMode() {
-      return !"qm".equals(daemonMode) && !"tr".equals(daemonMode);
+    public boolean isYarnContainerMode() {
+      return yarnContainerMode;
     }
 
     public void setNumClusterNodes(int numClusterNodes) {
@@ -272,32 +363,24 @@ public class TajoWorker extends CompositeService {
     public int getNumClusterSlots() {
       return TajoWorker.this.numClusterSlots.get();
     }
-  }
-
-  public void stopWorkerForce() {
-    stop();
-  }
 
-  private void setWorkerMode(String[] params) {
-    if("qm".equals(daemonMode)) { //QueryMaster mode
+    public int getPeerRpcPort() {
+      return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort();
+    }
 
-      String tajoMasterAddress = params[2];
-      connectToTajoMaster(tajoMasterAddress);
-      connectToCatalog();
+    public boolean isQueryMasterMode() {
+      return queryMasterMode;
+    }
 
-      QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
-      tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
-          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
-    } else if("tr".equals(daemonMode)) { //TaskRunner mode
-      taskRunnerManager.startTask(params);
-    } else { //Standby mode
-      connectToTajoMaster(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
-      connectToCatalog();
-      workerHeartbeatThread = new WorkerHeartbeatThread();
-      workerHeartbeatThread.start();
+    public boolean isTaskRunnerMode() {
+      return taskRunnerMode;
     }
   }
 
+  public void stopWorkerForce() {
+    stop();
+  }
+
   private void connectToTajoMaster(String tajoMasterAddrString) {
     LOG.info("Connecting to TajoMaster (" + tajoMasterAddrString +")");
     this.tajoMasterAddress = NetUtils.createSocketAddr(tajoMasterAddrString);
@@ -323,13 +406,14 @@ public class TajoWorker extends CompositeService {
     try {
       catalogClient = new CatalogClient(systemConf);
     } catch (IOException e) {
-      e.printStackTrace();
+      LOG.error(e.getMessage(), e);
     }
   }
 
   class WorkerHeartbeatThread extends Thread {
     TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
     int workerDisksNum;
     List<File> mountPaths;
 
@@ -338,7 +422,7 @@ public class TajoWorker extends CompositeService {
       int workerCpuCoreNum;
 
       boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
-
+      
       try {
         mountPaths = getMountPath();
       } catch (Exception e) {
@@ -378,7 +462,38 @@ public class TajoWorker extends CompositeService {
       int sendDiskInfoCount = 0;
       int pullServerPort = 0;
       if(pullService != null) {
-        pullServerPort = pullService.getPort();
+        long startTime = System.currentTimeMillis();
+        while(true) {
+          pullServerPort = pullService.getPort();
+          if(pullServerPort > 0) {
+            break;
+          }
+          //waiting while pull server init
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+          }
+          if(System.currentTimeMillis() - startTime > 30 * 1000) {
+            LOG.fatal("Too long push server init.");
+            System.exit(0);
+          }
+        }
+      }
+
+      String hostName = null;
+      int peerRpcPort = 0;
+      int queryMasterPort = 0;
+      int clientPort = 0;
+      if(workerContext.getTajoWorkerManagerService() != null) {
+        hostName = workerContext.getTajoWorkerManagerService().getBindAddr().getHostName();
+        peerRpcPort = workerContext.getTajoWorkerManagerService().getBindAddr().getPort();
+      }
+      if(workerContext.getQueryMasterManagerService() != null) {
+        hostName = workerContext.getQueryMasterManagerService().getBindAddr().getHostName();
+        queryMasterPort = workerContext.getQueryMasterManagerService().getBindAddr().getPort();
+      }
+      if(workerContext.getTajoWorkerClientService() != null) {
+        clientPort = workerContext.getTajoWorkerClientService().getBindAddr().getPort();
       }
 
       while(true) {
@@ -406,12 +521,15 @@ public class TajoWorker extends CompositeService {
             .setSystem(systemInfo)
             .setDiskSlots(workerDisksNum)
             .setJvmHeap(jvmHeap)
+            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(queryMasterMode))
+            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(taskRunnerMode))
             .build();
 
         TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
-            .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
-            .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
-            .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+            .setTajoWorkerHost(hostName)
+            .setTajoQueryMasterPort(queryMasterPort)
+            .setPeerRpcPort(peerRpcPort)
+            .setTajoWorkerClientPort(clientPort)
             .setTajoWorkerHttpPort(httpPort)
             .setTajoWorkerPullServerPort(pullServerPort)
             .setServerStatus(serverStatus)
@@ -463,12 +581,6 @@ public class TajoWorker extends CompositeService {
     }
   }
 
-  public void startWorker(TajoConf tajoConf, String[] args) {
-    init(tajoConf);
-    start();
-    setWorkerMode(args);
-  }
-
   String getThreadTaskName(long id, String name) {
     if (name == null) {
       return Long.toString(id);
@@ -549,18 +661,11 @@ public class TajoWorker extends CompositeService {
   public static void main(String[] args) throws Exception {
     StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
 
-    if(args.length < 1) {
-      LOG.error("Wrong startup params");
-      System.exit(-1);
-    }
-
-    String workerMode = args[0];
-
     TajoConf tajoConf = new TajoConf();
     tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
 
     try {
-      TajoWorker tajoWorker = new TajoWorker(workerMode);
+      TajoWorker tajoWorker = new TajoWorker();
       tajoWorker.startWorker(tajoConf, args);
     } catch (Throwable t) {
       LOG.fatal("Error starting TajoWorker", t);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c3e7130..4190dab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -51,8 +51,6 @@ public class TajoWorkerManagerService extends CompositeService
   private String addr;
   private int port;
 
-  private QueryMaster queryMaster;
-
   private TajoWorker.WorkerContext workerContext;
 
   public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
@@ -61,10 +59,6 @@ public class TajoWorkerManagerService extends CompositeService
     this.port = port;
   }
 
-  public QueryMaster getQueryMaster() {
-    return queryMaster;
-  }
-
   @Override
   public void init(Configuration conf) {
     TajoConf tajoConf = (TajoConf) conf;
@@ -83,10 +77,6 @@ public class TajoWorkerManagerService extends CompositeService
       this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
 
       this.port = bindAddr.getPort();
-
-      queryMaster = new QueryMaster(workerContext);
-      addService(queryMaster);
-
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
@@ -119,99 +109,13 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
-  public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
-                      RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
-    try {
-      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
-      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-      ContainerId cid =
-          queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
-
-      if(queryMasterTask == null || queryMasterTask.isStopped()) {
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
-        done.run(TaskSchedulerImpl.stopTaskRunnerReq);
-      } else {
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId);
-        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
-      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(
-          new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
   public void ping(RpcController controller,
                    TajoIdProtos.QueryUnitAttemptIdProto attemptId,
                    RpcCallback<PrimitiveProtos.BoolProto> done) {
-    // TODO - to be completed
-//      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-//    context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getExecutionBlockId()).
-//        getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-//        resetExpireTime();
     done.run(TajoWorker.TRUE_PROTO);
   }
 
   @Override
-  public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
-                         RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
-  public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
-  public void executeQuery(RpcController controller,
-                           TajoWorkerProtocol.QueryExecutionRequestProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryId queryId = new QueryId(request.getQueryId());
-      LOG.info("Receive executeQuery request:" + queryId);
-      queryMaster.handle(new QueryStartEvent(queryId,
-          new QueryContext(request.getQueryContext()), request.getSql().getValue(),
-          request.getLogicalPlanJson().getValue()));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
-  }
-
-  @Override
   public void executeExecutionBlock(RpcController controller,
                                     TajoWorkerProtocol.RunExecutionBlockRequestProto request,
                                     RpcCallback<PrimitiveProtos.BoolProto> done) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index e30f8c4..e77086d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -40,10 +40,11 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
@@ -68,7 +69,7 @@ public class Task {
   private final QueryContext queryContext;
   private final FileSystem localFS;
   private final TaskRunner.TaskRunnerContext taskRunnerContext;
-  private final Interface masterProxy;
+  private final QueryMasterProtocol.QueryMasterProtocolService.Interface masterProxy;
   private final LocalDirAllocator lDirAllocator;
   private final QueryUnitAttemptId taskId;
 
@@ -125,7 +126,8 @@ public class Task {
       };
 
   public Task(QueryUnitAttemptId taskId,
-              final TaskRunner.TaskRunnerContext worker, final Interface masterProxy,
+              final TaskRunner.TaskRunnerContext worker,
+              final QueryMasterProtocolService.Interface masterProxy,
               final QueryUnitRequest request) throws IOException {
     this.request = request;
     this.reporter = new Reporter(masterProxy);
@@ -544,12 +546,12 @@ public class Task {
   }
 
   protected class Reporter implements Runnable {
-    private Interface masterStub;
+    private QueryMasterProtocolService.Interface masterStub;
     private Thread pingThread;
     private Object lock = new Object();
     private static final int PROGRESS_INTERVAL = 3000;
 
-    public Reporter(Interface masterStub) {
+    public Reporter(QueryMasterProtocolService.Interface masterStub) {
       this.masterStub = masterStub;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index a5bb55c..f0c9033 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -38,6 +38,8 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.CallFuture;
@@ -68,7 +70,7 @@ public class TaskRunner extends AbstractService {
   private ContainerId containerId;
 
   // Cluster Management
-  private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
+  private QueryMasterProtocolService.Interface master;
 
   // for temporal or intermediate files
   private FileSystem localFS;
@@ -134,12 +136,12 @@ public class TaskRunner extends AbstractService {
       UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
       //taskOwner.addToken(token);
 
-      // initialize MasterWorkerProtocol as an actual task owner.
+      // initialize QueryMasterProtocol as an actual task owner.
       this.client =
           taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
             @Override
             public AsyncRpcClient run() throws Exception {
-              return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+              return new AsyncRpcClient(QueryMasterProtocol.class, masterAddr);
             }
           });
       this.master = client.getStub();
@@ -235,7 +237,7 @@ public class TaskRunner extends AbstractService {
       return nodeId.toString();
     }
 
-    public TajoWorkerProtocolService.Interface getMaster() {
+    public QueryMasterProtocolService.Interface getMaster() {
       return master;
     }
 
@@ -280,7 +282,7 @@ public class TaskRunner extends AbstractService {
     return taskRunnerContext;
   }
 
-  static void fatalError(TajoWorkerProtocolService.Interface proxy,
+  static void fatalError(QueryMasterProtocolService.Interface proxy,
                          QueryUnitAttemptId taskAttemptId, String message) {
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 85b1e6b..1ea213d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -74,7 +74,7 @@ public class TaskRunnerManager extends CompositeService {
       finishedTaskCleanThread.interrupted();
     }
     super.stop();
-    if(!workerContext.isStandbyMode()) {
+    if(workerContext.isYarnContainerMode()) {
       workerContext.stopWorker(true);
     }
   }
@@ -87,7 +87,7 @@ public class TaskRunnerManager extends CompositeService {
         finishedTaskRunnerMap.put(id, taskRunner);
       }
     }
-    if(!workerContext.isStandbyMode()) {
+    if(workerContext.isYarnContainerMode()) {
       stop();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
new file mode 100644
index 0000000..fe5bf03
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "TajoWorkerProtocol.proto";
+
+service QueryMasterProtocolService {
+  //from Worker
+  rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+  rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+  rpc done (TaskCompletionReport) returns (BoolProto);
+
+  //from TajoMaster's QueryJobManager
+  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index cdf78a6..f9b15a7 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -54,20 +54,23 @@ message ServerStatusProto {
     repeated Disk disk = 3;
     required int32 runningTaskNum = 4;
     required JvmHeap jvmHeap = 5;
+    required BoolProto queryMasterMode = 6;
+    required BoolProto taskRunnerMode = 7;
 }
 
 message TajoHeartbeat {
   required string tajoWorkerHost = 1;
-  required int32 tajoWorkerPort = 2;
-  optional ServerStatusProto serverStatus = 3;
-  optional int32 tajoWorkerClientPort = 4;
-  optional QueryIdProto queryId = 5;
-  optional QueryState state = 6;
-  optional string statusMessage = 7;
-  optional int32 tajoWorkerPullServerPort = 8;
-  optional int32 tajoWorkerHttpPort = 9;
-  optional float queryProgress = 10;
-  optional int64 queryFinishTime = 11;
+  required int32 peerRpcPort = 2;
+  required int32 tajoQueryMasterPort = 3;
+  optional ServerStatusProto serverStatus = 4;
+  optional int32 tajoWorkerClientPort = 5;
+  optional QueryIdProto queryId = 6;
+  optional QueryState state = 7;
+  optional string statusMessage = 8;
+  optional int32 tajoWorkerPullServerPort = 9;
+  optional int32 tajoWorkerHttpPort = 10;
+  optional float queryProgress = 11;
+  optional int64 queryFinishTime = 12;
 }
 
 message TajoHeartbeatResponse {
@@ -89,10 +92,12 @@ message WorkerResourceAllocationRequest {
 }
 
 message WorkerResourceProto {
-    required string workerHostAndPort = 1;
-    required ExecutionBlockIdProto executionBlockId = 2;
-    required int32 memoryMBSlots = 3 ;
-    required int32 diskSlots = 4;
+    required string host = 1;
+    required int32 peerRpcPort = 2;
+    required int32 queryMasterPort = 3;
+    required ExecutionBlockIdProto executionBlockId = 4;
+    required int32 memoryMBSlots = 5 ;
+    required int32 diskSlots = 6;
 }
 
 message WorkerResourceReleaseRequest {
@@ -100,8 +105,10 @@ message WorkerResourceReleaseRequest {
 }
 
 message WorkerAllocatedResource {
-    required string workerHostAndPort = 1;
-    required int32 workerPullServerPort = 2;
+    required string workerHost = 1;
+    required int32 peerRpcPort = 2;
+    required int32 queryMasterPort = 3;
+    required int32 workerPullServerPort = 4;
 }
 
 message WorkerResourceAllocationResponse {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 4d75e46..109dfa9 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -155,15 +155,7 @@ message RunExecutionBlockRequestProto {
 }
 
 service TajoWorkerProtocolService {
-  //from Worker
-  rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
-  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
-  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
-  rpc done (TaskCompletionReport) returns (BoolProto);
-
-  //from TajoMaster's QueryJobManager
-  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
 
   //from QueryMaster(Worker)
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5470bcea/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index ab24740..0ca3f9c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -24,4 +24,17 @@
     <name>tajo.worker.tmpdir.locations</name>
     <value>/tmp/tajo-${user.name}/tmpdir</value>
   </property>
+
+  <property>
+    <name>tajo.worker.mode.querymaster</name>
+    <value>true</value>
+    <description></description>
+  </property>
+
+  <property>
+    <name>tajo.worker.mode.taskrunner</name>
+    <value>true</value>
+    <description></description>
+  </property>
+
 </configuration>
\ No newline at end of file