You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:23 UTC

[7/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Closes #342


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/807868bd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/807868bd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/807868bd

Branch: refs/heads/index_support
Commit: 807868bd4d1ca1c8bd3aee33d31cca3d43dd2273
Parents: 50a8a66
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 20:44:21 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 20:44:21 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-core/pom.xml                               |   2 +-
 .../tajo/master/QueryCoordinatorService.java    | 160 ++++++++++
 .../org/apache/tajo/master/QueryInProgress.java | 228 +++++++++++++
 .../org/apache/tajo/master/QueryJobManager.java | 316 -------------------
 .../org/apache/tajo/master/QueryManager.java    | 315 ++++++++++++++++++
 .../apache/tajo/master/TajoContainerProxy.java  |  12 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../tajo/master/TajoMasterClientService.java    |   9 +-
 .../apache/tajo/master/TajoMasterService.java   | 161 ----------
 .../apache/tajo/master/exec/QueryExecutor.java  |   4 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  12 +-
 .../master/rm/TajoWorkerResourceManager.java    |  14 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  17 +-
 .../apache/tajo/master/scheduler/Scheduler.java |   2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |   8 +-
 .../tajo/querymaster/QueryInProgress.java       | 230 --------------
 .../apache/tajo/querymaster/QueryMaster.java    |  95 ++----
 .../java/org/apache/tajo/querymaster/Stage.java |  17 -
 .../main/java/org/apache/tajo/util/JSPUtil.java |   2 +-
 .../tajo/worker/TajoResourceAllocator.java      |  37 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  27 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  14 +-
 .../main/proto/QueryCoordinatorProtocol.proto   | 147 +++++++++
 .../main/proto/ResourceTrackerProtocol.proto    |   2 +-
 .../src/main/proto/TajoMasterProtocol.proto     | 147 ---------
 .../src/main/resources/webapps/admin/index.jsp  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../tajo/master/rm/TestTajoResourceManager.java |   3 +-
 31 files changed, 979 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e38f78..89488da 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
+    (hyunsik)
+
     TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
 
     TAJO-1282: Cleanup the relationship of QueryInProgress and 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index a7205dd..05ccf07 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -166,7 +166,7 @@
                 <argument>src/main/proto/ContainerProtocol.proto</argument>
                 <argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterProtocol.proto</argument>
-                <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+                <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
                 <argument>src/main/proto/TajoWorkerProtocol.proto</argument>
                 <argument>src/main/proto/InternalTypes.proto</argument>
               </arguments>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
new file mode 100644
index 0000000..1cb3842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class QueryCoordinatorService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final ProtocolServiceHandler masterHandler;
+  private AsyncRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public QueryCoordinatorService(TajoMaster.MasterContext context) {
+    super(QueryCoordinatorService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new ProtocolServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    try {
+      server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+        NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  /**
+   * Actual protocol service handler
+   */
+  private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface {
+
+    @Override
+    public void heartbeat(
+        RpcController controller,
+        TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
+      }
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+      QueryManager queryManager = context.getQueryJobManager();
+      command = queryManager.queryHeartbeat(request);
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+
+      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+      done.run(builder.build());
+    }
+
+    @Override
+    public void allocateWorkerResources(
+        RpcController controller,
+        QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
+      context.getResourceManager().allocateWorkerResources(request, done);
+    }
+
+    @Override
+    public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
+                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
+
+      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
+        context.getResourceManager().releaseWorkerResource(eachContainer);
+      }
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<WorkerResourcesRequest> done) {
+
+      WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
+      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+      for(Worker worker: workers) {
+        WorkerResource resource = worker.getResource();
+
+        WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+
+        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
+        workerResource.setMemoryMB(resource.getMemoryMB());
+        workerResource.setDiskSlots(resource.getDiskSlots());
+
+        builder.addWorkerResources(workerResource);
+      }
+      done.run(builder.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
new file mode 100644
index 0000000..73d8cb2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private NettyClientBase queryMasterRpc;
+
+  private QueryMasterProtocolService queryMasterRpcClient;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      Session session,
+      QueryContext queryContext,
+      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+
+    this.masterContext = masterContext;
+    this.session = session;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  public synchronized void kill() {
+    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+    if(queryMasterRpcClient != null){
+      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    }
+  }
+
+  public void stopProgress() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().releaseQueryMaster(queryId);
+
+    if(queryMasterRpc != null) {
+      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+    }
+
+    masterContext.getHistoryWriter().appendHistory(queryInfo);
+  }
+
+  public boolean startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+      // if no resource to allocate a query master
+      if(resource == null) {
+        LOG.info("No Available Resources for QueryMaster");
+        return false;
+      }
+
+      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+      return true;
+    } catch (Exception e) {
+      catchException(e);
+      return false;
+    }
+  }
+
+  private void connectQueryMaster() throws Exception {
+    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    LOG.info("Connect to QueryMaster:" + addr);
+    queryMasterRpc =
+        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+    queryMasterRpcClient = queryMasterRpc.getStub();
+  }
+
+  public synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+      builder.setQueryId(queryId.getProto())
+          .setQueryContext(queryInfo.getQueryContext().getProto())
+          .setSession(session.getProto())
+          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public boolean isStarted() {
+    return !stopped.get() && this.querySubmitted.get();
+  }
+
+  public void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+    // to avoid partial update by different heartbeats
+    synchronized (this.queryInfo) {
+
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
+      if (isFinishState(queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+      }
+
+      this.queryInfo.setQueryState(queryInfo.getQueryState());
+      this.queryInfo.setProgress(queryInfo.getProgress());
+      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+      // Update diagnosis message
+      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+        LOG.info(queryId + queryInfo.getLastMessage());
+      }
+
+      // if any error occurs, print outs the error message
+      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      }
+
+
+      if (isFinishState(this.queryInfo.getQueryState())) {
+        masterContext.getQueryJobManager().getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+      }
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
deleted file mode 100644
index 6a8da27..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.querymaster.QueryJobEvent;
-import org.apache.tajo.session.Session;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * QueryJobManager manages all scheduled and running queries.
- * It receives all Query related events and routes them to each QueryInProgress.
- */
-public class QueryJobManager extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
-  // TajoMaster Context
-  private final TajoMaster.MasterContext masterContext;
-
-  private AsyncDispatcher dispatcher;
-
-  private SimpleFifoScheduler scheduler;
-
-  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
-  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
-  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
-  private AtomicLong maxExecutionTime = new AtomicLong();
-  private AtomicLong avgExecutionTime = new AtomicLong();
-  private AtomicLong executedQuerySize = new AtomicLong();
-
-  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
-    super(QueryJobManager.class.getName());
-    this.masterContext = masterContext;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    try {
-      this.dispatcher = new AsyncDispatcher();
-      addService(this.dispatcher);
-
-      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
-      this.scheduler = new SimpleFifoScheduler(this);
-    } catch (Exception e) {
-      catchException(null, e);
-    }
-
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    synchronized(runningQueries) {
-      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stopProgress();
-      }
-    }
-    this.scheduler.stop();
-    super.serviceStop();
-  }
-
-  @Override
-  public void serviceStart() throws Exception {
-    this.scheduler.start();
-    super.serviceStart();
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  public Collection<QueryInProgress> getSubmittedQueries() {
-    synchronized (submittedQueries){
-      return Collections.unmodifiableCollection(submittedQueries.values());
-    }
-  }
-
-  public Collection<QueryInProgress> getRunningQueries() {
-    synchronized (runningQueries){
-      return Collections.unmodifiableCollection(runningQueries.values());
-    }
-  }
-
-  public synchronized Collection<QueryInfo> getFinishedQueries() {
-    try {
-      return this.masterContext.getHistoryReader().getQueries(null);
-    } catch (Throwable e) {
-      LOG.error(e);
-      return Lists.newArrayList();
-    }
-  }
-
-
-  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
-    try {
-      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
-    } catch (Throwable e) {
-      LOG.error(e);
-      return null;
-    }
-  }
-
-  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
-                                     String jsonExpr, LogicalRootNode plan)
-      throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
-        jsonExpr, plan);
-
-    synchronized (submittedQueries) {
-      queryInProgress.getQueryInfo().setQueryMaster("");
-      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    scheduler.addQuery(queryInProgress);
-    return queryInProgress.getQueryInfo();
-  }
-
-  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
-    QueryInProgress queryInProgress;
-
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.remove(queryId);
-    }
-
-    synchronized (runningQueries) {
-      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    if (queryInProgress.startQueryMaster()) {
-      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
-          queryInProgress.getQueryInfo()));
-    } else {
-      stopQuery(queryId);
-    }
-
-    return queryInProgress.getQueryInfo();
-  }
-
-  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-
-    @Override
-    public void handle(QueryJobEvent event) {
-      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-
-
-      if (queryInProgress == null) {
-        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
-        return;
-      }
-
-
-      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        queryInProgress.submmitQueryToMaster();
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
-        stopQuery(event.getQueryInfo().getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.kill();
-        stopQuery(queryInProgress.getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        queryInProgress.heartbeat(event.getQueryInfo());
-      }
-    }
-  }
-
-  public QueryInProgress getQueryInProgress(QueryId queryId) {
-    QueryInProgress queryInProgress;
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.get(queryId);
-    }
-
-    if (queryInProgress == null) {
-      synchronized (runningQueries) {
-        queryInProgress = runningQueries.get(queryId);
-      }
-    }
-    return queryInProgress;
-  }
-
-  public void stopQuery(QueryId queryId) {
-    LOG.info("Stop QueryInProgress:" + queryId);
-    QueryInProgress queryInProgress = getQueryInProgress(queryId);
-    if(queryInProgress != null) {
-      queryInProgress.stopProgress();
-      synchronized(submittedQueries) {
-        submittedQueries.remove(queryId);
-      }
-
-      synchronized(runningQueries) {
-        runningQueries.remove(queryId);
-      }
-
-      QueryInfo queryInfo = queryInProgress.getQueryInfo();
-      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
-      if (executionTime < minExecutionTime.get()) {
-        minExecutionTime.set(executionTime);
-      }
-
-      if (executionTime > maxExecutionTime.get()) {
-        maxExecutionTime.set(executionTime);
-      }
-
-      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
-      if (totalExecutionTime > 0) {
-        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
-      } else {
-        avgExecutionTime.set(executionTime);
-      }
-      executedQuerySize.incrementAndGet();
-    } else {
-      LOG.warn("No QueryInProgress while query stopping: " + queryId);
-    }
-  }
-
-  public long getMinExecutionTime() {
-    if (getExecutedQuerySize() == 0) return 0;
-    return minExecutionTime.get();
-  }
-
-  public long getMaxExecutionTime() {
-    return maxExecutionTime.get();
-  }
-
-  public long getAvgExecutionTime() {
-    return avgExecutionTime.get();
-  }
-
-  public long getExecutedQuerySize() {
-    return executedQuerySize.get();
-  }
-
-  private void catchException(QueryId queryId, Exception e) {
-    LOG.error(e.getMessage(), e);
-    QueryInProgress queryInProgress = runningQueries.get(queryId);
-    queryInProgress.catchException(e);
-  }
-
-  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
-      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
-    if(queryInProgress == null) {
-      return null;
-    }
-
-    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
-    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
-    return null;
-  }
-
-  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
-    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
-    queryInfo.setQueryMaster(connectionInfo.getHost());
-    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
-    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
-    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
-    queryInfo.setQueryState(queryHeartbeat.getState());
-    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
-    if (queryHeartbeat.hasQueryFinishTime()) {
-      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
-    }
-
-    if (queryHeartbeat.hasResultDesc()) {
-      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
-    }
-
-    return queryInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
new file mode 100644
index 0000000..296be04
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * QueryManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
+public class QueryManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private SimpleFifoScheduler scheduler;
+
+  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+  private AtomicLong maxExecutionTime = new AtomicLong();
+  private AtomicLong avgExecutionTime = new AtomicLong();
+  private AtomicLong executedQuerySize = new AtomicLong();
+
+  public QueryManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+      this.scheduler = new SimpleFifoScheduler(this);
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stopProgress();
+      }
+    }
+    this.scheduler.stop();
+    super.serviceStop();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    this.scheduler.start();
+    super.serviceStart();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public Collection<QueryInProgress> getSubmittedQueries() {
+    synchronized (submittedQueries){
+      return Collections.unmodifiableCollection(submittedQueries.values());
+    }
+  }
+
+  public Collection<QueryInProgress> getRunningQueries() {
+    synchronized (runningQueries){
+      return Collections.unmodifiableCollection(runningQueries.values());
+    }
+  }
+
+  public synchronized Collection<QueryInfo> getFinishedQueries() {
+    try {
+      return this.masterContext.getHistoryReader().getQueries(null);
+    } catch (Throwable e) {
+      LOG.error(e);
+      return Lists.newArrayList();
+    }
+  }
+
+
+  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+    try {
+      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+    } catch (Throwable e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql,
+                                 String jsonExpr, LogicalRootNode plan)
+      throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+        jsonExpr, plan);
+
+    synchronized (submittedQueries) {
+      queryInProgress.getQueryInfo().setQueryMaster("");
+      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    scheduler.addQuery(queryInProgress);
+    return queryInProgress.getQueryInfo();
+  }
+
+  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+    QueryInProgress queryInProgress;
+
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.remove(queryId);
+    }
+
+    synchronized (runningQueries) {
+      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    if (queryInProgress.startQueryMaster()) {
+      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+          queryInProgress.getQueryInfo()));
+    } else {
+      stopQuery(queryId);
+    }
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+
+
+      if (queryInProgress == null) {
+        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+        return;
+      }
+
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        queryInProgress.submmitQueryToMaster();
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+        stopQuery(event.getQueryInfo().getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        scheduler.removeQuery(queryInProgress.getQueryId());
+        queryInProgress.kill();
+        stopQuery(queryInProgress.getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        queryInProgress.heartbeat(event.getQueryInfo());
+      }
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    QueryInProgress queryInProgress;
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.get(queryId);
+    }
+
+    if (queryInProgress == null) {
+      synchronized (runningQueries) {
+        queryInProgress = runningQueries.get(queryId);
+      }
+    }
+    return queryInProgress;
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stopProgress();
+      synchronized(submittedQueries) {
+        submittedQueries.remove(queryId);
+      }
+
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+      }
+
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+      if (executionTime < minExecutionTime.get()) {
+        minExecutionTime.set(executionTime);
+      }
+
+      if (executionTime > maxExecutionTime.get()) {
+        maxExecutionTime.set(executionTime);
+      }
+
+      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+      if (totalExecutionTime > 0) {
+        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+      } else {
+        avgExecutionTime.set(executionTime);
+      }
+      executedQuerySize.incrementAndGet();
+    } else {
+      LOG.warn("No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  public long getMinExecutionTime() {
+    if (getExecutedQuerySize() == 0) return 0;
+    return minExecutionTime.get();
+  }
+
+  public long getMaxExecutionTime() {
+    return maxExecutionTime.get();
+  }
+
+  public long getAvgExecutionTime() {
+    return avgExecutionTime.get();
+  }
+
+  public long getExecutedQuerySize() {
+    return executedQuerySize.get();
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+    queryInfo.setQueryMaster(connectionInfo.getHost());
+    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
+    if (queryHeartbeat.hasResultDesc()) {
+      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+    }
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7209080..2ffd7ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
@@ -177,23 +177,23 @@ public class TajoContainerProxy extends ContainerProxy {
       if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(conf));
           context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(conf));
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
-          TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+            QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())
               .addAllContainerIds(containerIdProtos)
               .build(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c054599..786025a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService {
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
-  private TajoMasterService tajoMasterService;
+  private QueryCoordinatorService tajoMasterService;
   private SessionManager sessionManager;
 
   private WorkerResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
 
-  private QueryJobManager queryJobManager;
+  private QueryManager queryManager;
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
@@ -183,13 +183,13 @@ public class TajoMaster extends CompositeService {
       globalEngine = new GlobalEngine(context);
       addIfService(globalEngine);
 
-      queryJobManager = new QueryJobManager(context);
-      addIfService(queryJobManager);
+      queryManager = new QueryManager(context);
+      addIfService(queryManager);
 
       tajoMasterClientService = new TajoMasterClientService(context);
       addIfService(tajoMasterClientService);
 
-      tajoMasterService = new TajoMasterService(context);
+      tajoMasterService = new QueryCoordinatorService(context);
       addIfService(tajoMasterService);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -441,8 +441,8 @@ public class TajoMaster extends CompositeService {
       return clock;
     }
 
-    public QueryJobManager getQueryJobManager() {
-      return queryJobManager;
+    public QueryManager getQueryJobManager() {
+      return queryManager;
     }
 
     public WorkerResourceManager getResourceManager() {
@@ -469,7 +469,7 @@ public class TajoMaster extends CompositeService {
       return storeManager;
     }
 
-    public TajoMasterService getTajoMasterService() {
+    public QueryCoordinatorService getTajoMasterService() {
       return tajoMasterService;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 93326be..fcc016a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -567,8 +566,8 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
 
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+        QueryManager queryManager = context.getQueryJobManager();
+        QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId);
 
         QueryInfo queryInfo = null;
         if (queryInProgress == null) {
@@ -598,8 +597,8 @@ public class TajoMasterClientService extends AbstractService {
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+        QueryManager queryManager = context.getQueryJobManager();
+        queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
             new QueryInfo(queryId)));
         return BOOL_TRUE;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
deleted file mode 100644
index 02bdfa1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-
-public class TajoMasterService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
-
-  private final TajoMaster.MasterContext context;
-  private final TajoConf conf;
-  private final TajoMasterServiceHandler masterHandler;
-  private AsyncRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
-  public TajoMasterService(TajoMaster.MasterContext context) {
-    super(TajoMasterService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.masterHandler = new TajoMasterServiceHandler();
-  }
-
-  @Override
-  public void start() {
-    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
-    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-    try {
-      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
-    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-        NetUtils.normalizeInetSocketAddress(bindAddress));
-    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  public class TajoMasterServiceHandler
-      implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
-    @Override
-    public void heartbeat(
-        RpcController controller,
-        TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
-      }
-
-      TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
-
-      QueryJobManager queryJobManager = context.getQueryJobManager();
-      command = queryJobManager.queryHeartbeat(request);
-
-      TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
-      builder.setHeartbeatResult(BOOL_TRUE);
-      if(command != null) {
-        builder.setResponseCommand(command);
-      }
-
-      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
-      done.run(builder.build());
-    }
-
-    @Override
-    public void allocateWorkerResources(
-        RpcController controller,
-        TajoMasterProtocol.WorkerResourceAllocationRequest request,
-        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
-      context.getResourceManager().allocateWorkerResources(request, done);
-    }
-
-    @Override
-    public void releaseWorkerResource(RpcController controller,
-                                           TajoMasterProtocol.WorkerResourceReleaseRequest request,
-                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
-        context.getResourceManager().releaseWorkerResource(eachContainer);
-      }
-      done.run(BOOL_TRUE);
-    }
-
-    @Override
-    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
-                                     RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
-
-      TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
-          TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
-      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
-      for(Worker worker: workers) {
-        WorkerResource resource = worker.getResource();
-
-        TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
-            TajoMasterProtocol.WorkerResourceProto.newBuilder();
-
-        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
-        workerResource.setMemoryMB(resource.getMemoryMB());
-        workerResource.setDiskSlots(resource.getDiskSlots());
-
-        builder.addWorkerResources(workerResource);
-      }
-      done.run(builder.build());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2fbebc1..0860d63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -388,10 +388,10 @@ public class QueryExecutor {
     context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
     hookManager.doHooks(queryContext, plan);
 
-    QueryJobManager queryJobManager = this.context.getQueryJobManager();
+    QueryManager queryManager = this.context.getQueryJobManager();
     QueryInfo queryInfo;
 
-    queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+    queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
 
     if(queryInfo == null) {
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 831ce43..519aa9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
@@ -36,8 +37,6 @@ import org.apache.tajo.util.ProtoUtil;
 import java.io.IOError;
 import java.net.InetSocketAddress;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
 
@@ -110,7 +109,8 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   /** The response builder */
-  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+  private static final TajoHeartbeatResponse.Builder builder =
+      TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
 
   private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
     return new WorkerStatusEvent(
@@ -204,7 +204,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
   }
 
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     int totalDiskSlots = 0;
     int totalCpuCoreSlots = 0;
     int totalMemoryMB = 0;
@@ -230,7 +230,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
       }
     }
 
-    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+    return ClusterResourceSummary.newBuilder()
         .setNumWorkers(rmContext.getWorkers().size())
         .setTotalCpuCoreSlots(totalCpuCoreSlots)
         .setTotalDiskSlots(totalDiskSlots)

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 9f2a3d5..e5cf66c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
 
@@ -49,8 +49,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.*;
-
 
 /**
  * It manages all resources of tajo workers.
@@ -162,7 +160,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
   }
 
   @Override
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     return resourceTracker.getClusterResourceSummary();
   }
 
@@ -204,7 +202,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
     builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
     builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
-    builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+    builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
     builder.setNumContainers(1);
     return builder.build();
   }
@@ -358,10 +356,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
     int allocatedResources = 0;
 
-    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+    ResourceRequestPriority resourceRequestPriority
       = resourceRequest.request.getResourceRequestPriority();
 
-    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+    if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
       synchronized(rmContext) {
         List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
         Collections.shuffle(randomWorkers);

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 79ec0ac..662b699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -22,16 +22,15 @@ import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
 /**
  * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
  * and release the allocated containers.
@@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service {
    * @return A allocated container resource
    */
   @Deprecated
-  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+  public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
 
   /**
    * Request one or more resource containers. You can set the number of containers and resource capabilities, such as
@@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service {
    * @param request Request description
    * @param rpcCallBack Callback function
    */
-  public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
-      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+      RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack);
 
   /**
    * Release a container
@@ -100,7 +99,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @return The overall summary of cluster resources
    */
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+  public ClusterResourceSummary getClusterResourceSummary();
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
index 02203a9..19493d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.scheduler;
 
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index a091ed5..6cb98eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master.scheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.master.QueryJobManager;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryManager;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler {
   private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
   private final Thread queryProcessor;
   private AtomicBoolean stopped = new AtomicBoolean();
-  private QueryJobManager manager;
+  private QueryManager manager;
   private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
 
-  public SimpleFifoScheduler(QueryJobManager manager) {
+  public SimpleFifoScheduler(QueryManager manager) {
     this.manager = manager;
     this.queryProcessor = new Thread(new QueryProcessor());
     this.queryProcessor.setName("Query Processor");

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
deleted file mode 100644
index f83f244..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.session.Session;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress {
-  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
-  private QueryId queryId;
-
-  private Session session;
-
-  private LogicalRootNode plan;
-
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private QueryInfo queryInfo;
-
-  private final TajoMaster.MasterContext masterContext;
-
-  private NettyClientBase queryMasterRpc;
-
-  private QueryMasterProtocolService queryMasterRpcClient;
-
-  public QueryInProgress(
-      TajoMaster.MasterContext masterContext,
-      Session session,
-      QueryContext queryContext,
-      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-
-    this.masterContext = masterContext;
-    this.session = session;
-    this.queryId = queryId;
-    this.plan = plan;
-
-    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
-    queryInfo.setStartTime(System.currentTimeMillis());
-  }
-
-  public synchronized void kill() {
-    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-    if(queryMasterRpcClient != null){
-      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
-    }
-  }
-
-  public void stopProgress() {
-    if(stopped.getAndSet(true)) {
-      return;
-    }
-
-    LOG.info("=========================================================");
-    LOG.info("Stop query:" + queryId);
-
-    masterContext.getResourceManager().releaseQueryMaster(queryId);
-
-    if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
-    }
-
-    masterContext.getHistoryWriter().appendHistory(queryInfo);
-  }
-
-  public boolean startQueryMaster() {
-    try {
-      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
-      WorkerResourceManager resourceManager = masterContext.getResourceManager();
-      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
-      // if no resource to allocate a query master
-      if(resource == null) {
-        LOG.info("No Available Resources for QueryMaster");
-        return false;
-      }
-
-      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
-      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
-      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
-      return true;
-    } catch (Exception e) {
-      catchException(e);
-      return false;
-    }
-  }
-
-  private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
-    LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
-    queryMasterRpcClient = queryMasterRpc.getStub();
-  }
-
-  public synchronized void submmitQueryToMaster() {
-    if(querySubmitted.get()) {
-      return;
-    }
-
-    try {
-      if(queryMasterRpcClient == null) {
-        connectQueryMaster();
-      }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster conneciton info.");
-        //TODO wait
-        return;
-      }
-      LOG.info("Call executeQuery to :" +
-          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
-      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
-      builder.setQueryId(queryId.getProto())
-          .setQueryContext(queryInfo.getQueryContext().getProto())
-          .setSession(session.getProto())
-          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
-          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
-      querySubmitted.set(true);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  public void catchException(Exception e) {
-    LOG.error(e.getMessage(), e);
-    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
-    queryInfo.setLastMessage(StringUtils.stringifyException(e));
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public QueryInfo getQueryInfo() {
-    return this.queryInfo;
-  }
-
-  public boolean isStarted() {
-    return !stopped.get() && this.querySubmitted.get();
-  }
-
-  public void heartbeat(QueryInfo queryInfo) {
-    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
-    // to avoid partial update by different heartbeats
-    synchronized (this.queryInfo) {
-
-      // terminal state will let client to retrieve a query result
-      // So, we must set the query result before changing query state
-      if (isFinishState(queryInfo.getQueryState())) {
-        if (queryInfo.hasResultdesc()) {
-          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
-        }
-      }
-
-      this.queryInfo.setQueryState(queryInfo.getQueryState());
-      this.queryInfo.setProgress(queryInfo.getProgress());
-      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
-      // Update diagnosis message
-      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
-        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-        LOG.info(queryId + queryInfo.getLastMessage());
-      }
-
-      // if any error occurs, print outs the error message
-      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
-      }
-
-
-      if (isFinishState(this.queryInfo.getQueryState())) {
-        masterContext.getQueryJobManager().getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
-      }
-    }
-  }
-
-  private boolean isFinishState(TajoProtos.QueryState state) {
-    return state == TajoProtos.QueryState.QUERY_FAILED ||
-        state == TajoProtos.QueryState.QUERY_KILLED ||
-        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 02760a3..53390a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -34,7 +34,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
@@ -56,10 +57,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
 
@@ -182,12 +179,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
     LOG.info("cleanup executionBlocks: " + cleanupMessage);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
     TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
     builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
     TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -206,9 +203,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
   private void cleanup(QueryId queryId) {
     LOG.info("cleanup query resources : " + queryId);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -224,7 +221,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
   }
 
-  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+  public List<WorkerResourceProto> getAllWorker() {
 
     NettyClientBase rpc = null;
     try {
@@ -235,78 +232,34 @@ public class QueryMaster extends CompositeService implements EventHandler {
       if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(systemConf));
           queryMasterContext.getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(systemConf));
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+      QueryCoordinatorProtocolService masterService = rpc.getStub();
 
-      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
-          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
       masterService.getAllWorkerResource(callBack.getController(),
           PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
 
-      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+      WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
       return workerResourcesRequest.getWorkerResourcesList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(rpc);
     }
-    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-  }
-
-  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
-    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
-      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
-          .setState(state)
-          .setQueryId(queryId.getProto());
-
-      CallFuture<TajoHeartbeatResponse> callBack =
-          new CallFuture<TajoHeartbeatResponse>();
-
-      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
+    return new ArrayList<WorkerResourceProto>();
   }
 
   @Override
@@ -407,19 +360,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
         if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
           try {
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
             queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
       }  catch (Exception e) {
         //this function will be closed in new thread.
@@ -524,24 +477,24 @@ public class QueryMaster extends CompositeService implements EventHandler {
               if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
                 try {
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 } catch (Exception e) {
                   queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
                       HAServiceUtil.getResourceTrackerAddress(systemConf));
                   queryMasterContext.getWorkerContext().setTajoMasterAddress(
                       HAServiceUtil.getMasterUmbilicalAddress(systemConf));
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 }
               } else {
                 tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                    TajoMasterProtocol.class, true);
+                    QueryCoordinatorProtocol.class, true);
               }
 
-              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+              QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 
-              CallFuture<TajoHeartbeatResponse> callBack =
-                  new CallFuture<TajoHeartbeatResponse>();
+              CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
+                  new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
 
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
               masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);