You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/09 12:46:50 UTC

[1/2] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Repository: tajo
Updated Branches:
  refs/heads/master 50a8a663c -> 807868bd4


http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
     }
 
     /**
-     * Getting the total memory of cluster
-     *
-     * @param stage
-     * @return mega bytes
-     */
-    private static int getClusterTotalMemory(Stage stage) {
-      List<TajoMasterProtocol.WorkerResourceProto> workers =
-          stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
-      int totalMem = 0;
-      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
-        totalMem += worker.getMemoryMB();
-      }
-      return totalMem;
-    }
-    /**
      * Getting the desire number of partitions according to the volume of input data.
      * This method is only used to determine the partition key number of hash join or aggregation.
      *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Stage;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
                                            int memoryMBPerTask) {
     //TODO consider disk slot
 
-    TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+    ClusterResourceSummary clusterResource = workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
     LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void run() {
       LOG.info("Start TajoWorkerAllocationThread");
-      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
-        new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+      CallFuture<WorkerResourceAllocationResponse> callBack =
+        new CallFuture<WorkerResourceAllocationResponse>();
 
       //TODO consider task's resource usage pattern
       int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
       float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
 
-      TajoMasterProtocol.WorkerResourceAllocationRequest request =
-        TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
           .setMinMemoryMBPerContainer(requiredMemoryMB)
           .setMaxMemoryMBPerContainer(requiredMemoryMB)
           .setNumContainers(event.getRequiredNum())
-          .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
-            : TajoMasterProtocol.ResourceRequestPriority.DISK)
+          .setResourceRequestPriority(!event.isLeafQuery() ?
+              ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
           .setMinDiskSlotPerContainer(requiredDiskSlots)
           .setMaxDiskSlotPerContainer(requiredDiskSlots)
           .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           try {
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryTaskContext.getQueryMasterContext().getWorkerContext().
               setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(
             queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         connPool.releaseConnection(tmClient);
       }
 
-      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+      WorkerResourceAllocationResponse response = null;
       while(!stopped.get()) {
         try {
           response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       int numAllocatedContainers = 0;
 
       if(response != null) {
-        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+        List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
         List<TajoContainer> containers = new ArrayList<TajoContainer>();
-        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+        for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
             eachAllocatedResource.getConnectionInfo().getPeerRpcPort());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import com.codahale.metrics.Gauge;
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
 
   private AtomicInteger numClusterNodes = new AtomicInteger();
 
-  private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+  private ClusterResourceSummary clusterResource;
 
   private WorkerConnectionInfo connectionInfo;
 
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
       return TajoWorker.this.numClusterNodes.get();
     }
 
-    public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+    public void setClusterResource(ClusterResourceSummary clusterResource) {
       synchronized (numClusterNodes) {
         TajoWorker.this.clusterResource = clusterResource;
       }
     }
 
-    public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+    public ClusterResourceSummary getClusterResource() {
       synchronized (numClusterNodes) {
         return TajoWorker.this.clusterResource;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
 import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.io.File;
 import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
 
   class WorkerHeartbeatThread extends Thread {
     private volatile AtomicBoolean stopped = new AtomicBoolean(false);
-    TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+    ServerStatusProto.System systemInfo;
+    List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
     float workerDiskSlots;
     int workerMemoryMB;
     List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
         }
       }
 
-      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+      systemInfo = ServerStatusProto.System.newBuilder()
           .setAvailableProcessors(workerCpuCoreNum)
           .setFreeMemoryMB(0)
           .setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
           getDiskUsageInfos();
         }
-        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
-            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+        ServerStatusProto.JvmHeap jvmHeap =
+            ServerStatusProto.JvmHeap.newBuilder()
                 .setMaxHeap(Runtime.getRuntime().maxMemory())
                 .setFreeHeap(Runtime.getRuntime().freeMemory())
                 .setTotalHeap(Runtime.getRuntime().totalMemory())
                 .build();
 
-        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+        ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
             .addAllDisk(diskInfos)
             .setRunningTaskNum(
                 context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
 
         NettyClientBase rmClient = null;
         try {
-          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+          CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
 
           // In TajoMaster HA mode, if backup master be active status,
           // worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
           TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
 
-          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
           if(response != null) {
-            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
             if(clusterResourceSummary.getNumWorkers() > 0) {
               context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
             }
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
         if(mountInfos != null) {
           for(DiskMountInfo eachMount: mountInfos) {
             File eachFile = new File(eachMount.getMountPath());
-            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+            diskInfos.add(ServerStatusProto.Disk.newBuilder()
                 .setAbsolutePath(eachFile.getAbsolutePath())
                 .setTotalSpace(eachFile.getTotalSpace())
                 .setFreeSpace(eachFile.getFreeSpace())

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
 
 package org.apache.tajo.worker.rule;
 
-import java.net.InetSocketAddress;
-
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
+import java.net.InetSocketAddress;
+
 /**
  * With this rule, Tajo worker will check the connectivity to tajo master server.
  */
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
       } else {
         masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
       }
-      masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+      masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
       
       masterClient.getStub();
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+    message System {
+        required int32 availableProcessors = 1;
+        required int32 freeMemoryMB = 2;
+        required int32 maxMemoryMB = 3;
+        required int32 totalMemoryMB = 4;
+    }
+    message Disk {
+        required string absolutePath = 1;
+        required int64 totalSpace = 2;
+        required int64 freeSpace = 3;
+        required int64 usableSpace = 4;
+    }
+
+    message JvmHeap {
+        required int64 maxHeap = 1;
+        required int64 totalHeap = 2;
+        required int64 freeHeap = 3;
+    }
+
+    required System system = 1;
+    required float diskSlots = 2;
+    required int32 memoryResourceMB = 3;
+    repeated Disk disk = 4;
+    required int32 runningTaskNum = 5;
+    required JvmHeap jvmHeap = 6;
+    required BoolProto queryMasterMode = 7;
+    required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  optional QueryIdProto queryId = 2;
+  optional QueryState state = 3;
+  optional TableDescProto resultDesc = 4;
+  optional string statusMessage = 5;
+  optional float queryProgress = 6;
+  optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+  message ResponseCommand {
+      required string command = 1;
+      repeated string params = 2;
+  }
+  required BoolProto heartbeatResult = 1;
+  required ClusterResourceSummary clusterResourceSummary = 2;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+  required int32 numWorkers = 1;
+  required int32 totalDiskSlots = 2;
+  required int32 totalCpuCoreSlots = 3;
+  required int32 totalMemoryMB = 4;
+
+  required int32 totalAvailableDiskSlots = 5;
+  required int32 totalAvailableCpuCoreSlots = 6;
+  required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+    MEMORY = 1;
+    DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+    required QueryIdProto queryId = 1;
+    required ResourceRequestPriority resourceRequestPriority = 2;
+
+    required int32 numContainers = 3;
+
+    required int32 maxMemoryMBPerContainer = 4;
+    required int32 minMemoryMBPerContainer = 5;
+
+    required float maxDiskSlotPerContainer = 6;
+    required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+    required WorkerConnectionInfoProto connectionInfo = 1;
+    required int32 memoryMB = 2 ;
+    required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+    repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+    required TajoContainerIdProto containerId = 1;
+    required WorkerConnectionInfoProto connectionInfo = 2;
+
+    required int32 allocatedMemoryMB = 3;
+    required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+    required QueryIdProto queryId = 1;
+    repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
 import "ContainerProtocol.proto";
 import "tajo_protos.proto";
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
-    message System {
-        required int32 availableProcessors = 1;
-        required int32 freeMemoryMB = 2;
-        required int32 maxMemoryMB = 3;
-        required int32 totalMemoryMB = 4;
-    }
-    message Disk {
-        required string absolutePath = 1;
-        required int64 totalSpace = 2;
-        required int64 freeSpace = 3;
-        required int64 usableSpace = 4;
-    }
-
-    message JvmHeap {
-        required int64 maxHeap = 1;
-        required int64 totalHeap = 2;
-        required int64 freeHeap = 3;
-    }
-
-    required System system = 1;
-    required float diskSlots = 2;
-    required int32 memoryResourceMB = 3;
-    repeated Disk disk = 4;
-    required int32 runningTaskNum = 5;
-    required JvmHeap jvmHeap = 6;
-    required BoolProto queryMasterMode = 7;
-    required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
-  required WorkerConnectionInfoProto connectionInfo = 1;
-  optional QueryIdProto queryId = 2;
-  optional QueryState state = 3;
-  optional TableDescProto resultDesc = 4;
-  optional string statusMessage = 5;
-  optional float queryProgress = 6;
-  optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
-  message ResponseCommand {
-      required string command = 1;
-      repeated string params = 2;
-  }
-  required BoolProto heartbeatResult = 1;
-  required ClusterResourceSummary clusterResourceSummary = 2;
-  optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
-  required int32 numWorkers = 1;
-  required int32 totalDiskSlots = 2;
-  required int32 totalCpuCoreSlots = 3;
-  required int32 totalMemoryMB = 4;
-
-  required int32 totalAvailableDiskSlots = 5;
-  required int32 totalAvailableCpuCoreSlots = 6;
-  required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
-    MEMORY = 1;
-    DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
-    required QueryIdProto queryId = 1;
-    required ResourceRequestPriority resourceRequestPriority = 2;
-
-    required int32 numContainers = 3;
-
-    required int32 maxMemoryMBPerContainer = 4;
-    required int32 minMemoryMBPerContainer = 5;
-
-    required float maxDiskSlotPerContainer = 6;
-    required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
-    required WorkerConnectionInfoProto connectionInfo = 1;
-    required int32 memoryMB = 2 ;
-    required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
-    repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
-    required ExecutionBlockIdProto executionBlockId = 1;
-    repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
-    required TajoContainerIdProto containerId = 1;
-    required WorkerConnectionInfoProto connectionInfo = 2;
-
-    required int32 allocatedMemoryMB = 3;
-    required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
-    required QueryIdProto queryId = 1;
-    repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
-  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
-  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
-  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
-  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.ha.HAService" %>
 <%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.StringUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
 import org.apache.tajo.querymaster.Query;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
 package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;


[2/2] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Posted by hy...@apache.org.
TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

Closes #342


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/807868bd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/807868bd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/807868bd

Branch: refs/heads/master
Commit: 807868bd4d1ca1c8bd3aee33d31cca3d43dd2273
Parents: 50a8a66
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 20:44:21 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 20:44:21 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-core/pom.xml                               |   2 +-
 .../tajo/master/QueryCoordinatorService.java    | 160 ++++++++++
 .../org/apache/tajo/master/QueryInProgress.java | 228 +++++++++++++
 .../org/apache/tajo/master/QueryJobManager.java | 316 -------------------
 .../org/apache/tajo/master/QueryManager.java    | 315 ++++++++++++++++++
 .../apache/tajo/master/TajoContainerProxy.java  |  12 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  16 +-
 .../tajo/master/TajoMasterClientService.java    |   9 +-
 .../apache/tajo/master/TajoMasterService.java   | 161 ----------
 .../apache/tajo/master/exec/QueryExecutor.java  |   4 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  12 +-
 .../master/rm/TajoWorkerResourceManager.java    |  14 +-
 .../tajo/master/rm/WorkerResourceManager.java   |  17 +-
 .../apache/tajo/master/scheduler/Scheduler.java |   2 +-
 .../master/scheduler/SimpleFifoScheduler.java   |   8 +-
 .../tajo/querymaster/QueryInProgress.java       | 230 --------------
 .../apache/tajo/querymaster/QueryMaster.java    |  95 ++----
 .../java/org/apache/tajo/querymaster/Stage.java |  17 -
 .../main/java/org/apache/tajo/util/JSPUtil.java |   2 +-
 .../tajo/worker/TajoResourceAllocator.java      |  37 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  20 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  27 +-
 .../ConnectivityCheckerRuleForTajoWorker.java   |  14 +-
 .../main/proto/QueryCoordinatorProtocol.proto   | 147 +++++++++
 .../main/proto/ResourceTrackerProtocol.proto    |   2 +-
 .../src/main/proto/TajoMasterProtocol.proto     | 147 ---------
 .../src/main/resources/webapps/admin/index.jsp  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |   2 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   2 +-
 .../tajo/master/rm/TestTajoResourceManager.java |   3 +-
 31 files changed, 979 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e38f78..89488da 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
+    (hyunsik)
+
     TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
 
     TAJO-1282: Cleanup the relationship of QueryInProgress and 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index a7205dd..05ccf07 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -166,7 +166,7 @@
                 <argument>src/main/proto/ContainerProtocol.proto</argument>
                 <argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
                 <argument>src/main/proto/QueryMasterProtocol.proto</argument>
-                <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+                <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
                 <argument>src/main/proto/TajoWorkerProtocol.proto</argument>
                 <argument>src/main/proto/InternalTypes.proto</argument>
               </arguments>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
new file mode 100644
index 0000000..1cb3842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class QueryCoordinatorService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final ProtocolServiceHandler masterHandler;
+  private AsyncRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public QueryCoordinatorService(TajoMaster.MasterContext context) {
+    super(QueryCoordinatorService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new ProtocolServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+    try {
+      server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+        NetUtils.normalizeInetSocketAddress(bindAddress));
+    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  /**
+   * Actual protocol service handler
+   */
+  private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface {
+
+    @Override
+    public void heartbeat(
+        RpcController controller,
+        TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
+      }
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+      QueryManager queryManager = context.getQueryJobManager();
+      command = queryManager.queryHeartbeat(request);
+
+      QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+
+      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+      done.run(builder.build());
+    }
+
+    @Override
+    public void allocateWorkerResources(
+        RpcController controller,
+        QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
+      context.getResourceManager().allocateWorkerResources(request, done);
+    }
+
+    @Override
+    public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
+                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
+
+      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
+        context.getResourceManager().releaseWorkerResource(eachContainer);
+      }
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+                                     RpcCallback<WorkerResourcesRequest> done) {
+
+      WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
+      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+      for(Worker worker: workers) {
+        WorkerResource resource = worker.getResource();
+
+        WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+
+        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
+        workerResource.setMemoryMB(resource.getMemoryMB());
+        workerResource.setDiskSlots(resource.getDiskSlots());
+
+        builder.addWorkerResources(workerResource);
+      }
+      done.run(builder.build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
new file mode 100644
index 0000000..73d8cb2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private NettyClientBase queryMasterRpc;
+
+  private QueryMasterProtocolService queryMasterRpcClient;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      Session session,
+      QueryContext queryContext,
+      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+
+    this.masterContext = masterContext;
+    this.session = session;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  public synchronized void kill() {
+    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+    if(queryMasterRpcClient != null){
+      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    }
+  }
+
+  public void stopProgress() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().releaseQueryMaster(queryId);
+
+    if(queryMasterRpc != null) {
+      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+    }
+
+    masterContext.getHistoryWriter().appendHistory(queryInfo);
+  }
+
+  public boolean startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+      // if no resource to allocate a query master
+      if(resource == null) {
+        LOG.info("No Available Resources for QueryMaster");
+        return false;
+      }
+
+      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+      return true;
+    } catch (Exception e) {
+      catchException(e);
+      return false;
+    }
+  }
+
+  private void connectQueryMaster() throws Exception {
+    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    LOG.info("Connect to QueryMaster:" + addr);
+    queryMasterRpc =
+        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+    queryMasterRpcClient = queryMasterRpc.getStub();
+  }
+
+  public synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+      builder.setQueryId(queryId.getProto())
+          .setQueryContext(queryInfo.getQueryContext().getProto())
+          .setSession(session.getProto())
+          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public boolean isStarted() {
+    return !stopped.get() && this.querySubmitted.get();
+  }
+
+  public void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+    // to avoid partial update by different heartbeats
+    synchronized (this.queryInfo) {
+
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
+      if (isFinishState(queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+      }
+
+      this.queryInfo.setQueryState(queryInfo.getQueryState());
+      this.queryInfo.setProgress(queryInfo.getProgress());
+      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+      // Update diagnosis message
+      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+        LOG.info(queryId + queryInfo.getLastMessage());
+      }
+
+      // if any error occurs, print outs the error message
+      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      }
+
+
+      if (isFinishState(this.queryInfo.getQueryState())) {
+        masterContext.getQueryJobManager().getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+      }
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
deleted file mode 100644
index 6a8da27..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.querymaster.QueryJobEvent;
-import org.apache.tajo.session.Session;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * QueryJobManager manages all scheduled and running queries.
- * It receives all Query related events and routes them to each QueryInProgress.
- */
-public class QueryJobManager extends CompositeService {
-  private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
-  // TajoMaster Context
-  private final TajoMaster.MasterContext masterContext;
-
-  private AsyncDispatcher dispatcher;
-
-  private SimpleFifoScheduler scheduler;
-
-  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
-  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
-  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
-  private AtomicLong maxExecutionTime = new AtomicLong();
-  private AtomicLong avgExecutionTime = new AtomicLong();
-  private AtomicLong executedQuerySize = new AtomicLong();
-
-  public QueryJobManager(final TajoMaster.MasterContext masterContext) {
-    super(QueryJobManager.class.getName());
-    this.masterContext = masterContext;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    try {
-      this.dispatcher = new AsyncDispatcher();
-      addService(this.dispatcher);
-
-      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
-      this.scheduler = new SimpleFifoScheduler(this);
-    } catch (Exception e) {
-      catchException(null, e);
-    }
-
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStop() throws Exception {
-    synchronized(runningQueries) {
-      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
-        eachQueryInProgress.stopProgress();
-      }
-    }
-    this.scheduler.stop();
-    super.serviceStop();
-  }
-
-  @Override
-  public void serviceStart() throws Exception {
-    this.scheduler.start();
-    super.serviceStart();
-  }
-
-  public EventHandler getEventHandler() {
-    return dispatcher.getEventHandler();
-  }
-
-  public Collection<QueryInProgress> getSubmittedQueries() {
-    synchronized (submittedQueries){
-      return Collections.unmodifiableCollection(submittedQueries.values());
-    }
-  }
-
-  public Collection<QueryInProgress> getRunningQueries() {
-    synchronized (runningQueries){
-      return Collections.unmodifiableCollection(runningQueries.values());
-    }
-  }
-
-  public synchronized Collection<QueryInfo> getFinishedQueries() {
-    try {
-      return this.masterContext.getHistoryReader().getQueries(null);
-    } catch (Throwable e) {
-      LOG.error(e);
-      return Lists.newArrayList();
-    }
-  }
-
-
-  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
-    try {
-      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
-    } catch (Throwable e) {
-      LOG.error(e);
-      return null;
-    }
-  }
-
-  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
-                                     String jsonExpr, LogicalRootNode plan)
-      throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
-        jsonExpr, plan);
-
-    synchronized (submittedQueries) {
-      queryInProgress.getQueryInfo().setQueryMaster("");
-      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    scheduler.addQuery(queryInProgress);
-    return queryInProgress.getQueryInfo();
-  }
-
-  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
-    QueryInProgress queryInProgress;
-
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.remove(queryId);
-    }
-
-    synchronized (runningQueries) {
-      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
-    }
-
-    if (queryInProgress.startQueryMaster()) {
-      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
-          queryInProgress.getQueryInfo()));
-    } else {
-      stopQuery(queryId);
-    }
-
-    return queryInProgress.getQueryInfo();
-  }
-
-  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-
-    @Override
-    public void handle(QueryJobEvent event) {
-      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-
-
-      if (queryInProgress == null) {
-        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
-        return;
-      }
-
-
-      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
-        queryInProgress.submmitQueryToMaster();
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
-        stopQuery(event.getQueryInfo().getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
-        scheduler.removeQuery(queryInProgress.getQueryId());
-        queryInProgress.kill();
-        stopQuery(queryInProgress.getQueryId());
-
-      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
-        queryInProgress.heartbeat(event.getQueryInfo());
-      }
-    }
-  }
-
-  public QueryInProgress getQueryInProgress(QueryId queryId) {
-    QueryInProgress queryInProgress;
-    synchronized (submittedQueries) {
-      queryInProgress = submittedQueries.get(queryId);
-    }
-
-    if (queryInProgress == null) {
-      synchronized (runningQueries) {
-        queryInProgress = runningQueries.get(queryId);
-      }
-    }
-    return queryInProgress;
-  }
-
-  public void stopQuery(QueryId queryId) {
-    LOG.info("Stop QueryInProgress:" + queryId);
-    QueryInProgress queryInProgress = getQueryInProgress(queryId);
-    if(queryInProgress != null) {
-      queryInProgress.stopProgress();
-      synchronized(submittedQueries) {
-        submittedQueries.remove(queryId);
-      }
-
-      synchronized(runningQueries) {
-        runningQueries.remove(queryId);
-      }
-
-      QueryInfo queryInfo = queryInProgress.getQueryInfo();
-      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
-      if (executionTime < minExecutionTime.get()) {
-        minExecutionTime.set(executionTime);
-      }
-
-      if (executionTime > maxExecutionTime.get()) {
-        maxExecutionTime.set(executionTime);
-      }
-
-      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
-      if (totalExecutionTime > 0) {
-        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
-      } else {
-        avgExecutionTime.set(executionTime);
-      }
-      executedQuerySize.incrementAndGet();
-    } else {
-      LOG.warn("No QueryInProgress while query stopping: " + queryId);
-    }
-  }
-
-  public long getMinExecutionTime() {
-    if (getExecutedQuerySize() == 0) return 0;
-    return minExecutionTime.get();
-  }
-
-  public long getMaxExecutionTime() {
-    return maxExecutionTime.get();
-  }
-
-  public long getAvgExecutionTime() {
-    return avgExecutionTime.get();
-  }
-
-  public long getExecutedQuerySize() {
-    return executedQuerySize.get();
-  }
-
-  private void catchException(QueryId queryId, Exception e) {
-    LOG.error(e.getMessage(), e);
-    QueryInProgress queryInProgress = runningQueries.get(queryId);
-    queryInProgress.catchException(e);
-  }
-
-  public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
-      TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
-    if(queryInProgress == null) {
-      return null;
-    }
-
-    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
-    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
-    return null;
-  }
-
-  private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
-    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
-    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
-    queryInfo.setQueryMaster(connectionInfo.getHost());
-    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
-    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
-    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
-    queryInfo.setQueryState(queryHeartbeat.getState());
-    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
-    if (queryHeartbeat.hasQueryFinishTime()) {
-      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
-    }
-
-    if (queryHeartbeat.hasResultDesc()) {
-      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
-    }
-
-    return queryInfo;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
new file mode 100644
index 0000000..296be04
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * QueryManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
+public class QueryManager extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryManager.class.getName());
+
+  // TajoMaster Context
+  private final TajoMaster.MasterContext masterContext;
+
+  private AsyncDispatcher dispatcher;
+
+  private SimpleFifoScheduler scheduler;
+
+  private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+  private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+  private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+  private AtomicLong maxExecutionTime = new AtomicLong();
+  private AtomicLong avgExecutionTime = new AtomicLong();
+  private AtomicLong executedQuerySize = new AtomicLong();
+
+  public QueryManager(final TajoMaster.MasterContext masterContext) {
+    super(QueryManager.class.getName());
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    try {
+      this.dispatcher = new AsyncDispatcher();
+      addService(this.dispatcher);
+
+      this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+      this.scheduler = new SimpleFifoScheduler(this);
+    } catch (Exception e) {
+      catchException(null, e);
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    synchronized(runningQueries) {
+      for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+        eachQueryInProgress.stopProgress();
+      }
+    }
+    this.scheduler.stop();
+    super.serviceStop();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    this.scheduler.start();
+    super.serviceStart();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public Collection<QueryInProgress> getSubmittedQueries() {
+    synchronized (submittedQueries){
+      return Collections.unmodifiableCollection(submittedQueries.values());
+    }
+  }
+
+  public Collection<QueryInProgress> getRunningQueries() {
+    synchronized (runningQueries){
+      return Collections.unmodifiableCollection(runningQueries.values());
+    }
+  }
+
+  public synchronized Collection<QueryInfo> getFinishedQueries() {
+    try {
+      return this.masterContext.getHistoryReader().getQueries(null);
+    } catch (Throwable e) {
+      LOG.error(e);
+      return Lists.newArrayList();
+    }
+  }
+
+
+  public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+    try {
+      return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+    } catch (Throwable e) {
+      LOG.error(e);
+      return null;
+    }
+  }
+
+  public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql,
+                                 String jsonExpr, LogicalRootNode plan)
+      throws Exception {
+    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+        jsonExpr, plan);
+
+    synchronized (submittedQueries) {
+      queryInProgress.getQueryInfo().setQueryMaster("");
+      submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    scheduler.addQuery(queryInProgress);
+    return queryInProgress.getQueryInfo();
+  }
+
+  public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+    QueryInProgress queryInProgress;
+
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.remove(queryId);
+    }
+
+    synchronized (runningQueries) {
+      runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+    }
+
+    if (queryInProgress.startQueryMaster()) {
+      dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+          queryInProgress.getQueryInfo()));
+    } else {
+      stopQuery(queryId);
+    }
+
+    return queryInProgress.getQueryInfo();
+  }
+
+  class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
+    @Override
+    public void handle(QueryJobEvent event) {
+      QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+
+
+      if (queryInProgress == null) {
+        LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+        return;
+      }
+
+
+      if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        queryInProgress.submmitQueryToMaster();
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+        stopQuery(event.getQueryInfo().getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        scheduler.removeQuery(queryInProgress.getQueryId());
+        queryInProgress.kill();
+        stopQuery(queryInProgress.getQueryId());
+
+      } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        queryInProgress.heartbeat(event.getQueryInfo());
+      }
+    }
+  }
+
+  public QueryInProgress getQueryInProgress(QueryId queryId) {
+    QueryInProgress queryInProgress;
+    synchronized (submittedQueries) {
+      queryInProgress = submittedQueries.get(queryId);
+    }
+
+    if (queryInProgress == null) {
+      synchronized (runningQueries) {
+        queryInProgress = runningQueries.get(queryId);
+      }
+    }
+    return queryInProgress;
+  }
+
+  public void stopQuery(QueryId queryId) {
+    LOG.info("Stop QueryInProgress:" + queryId);
+    QueryInProgress queryInProgress = getQueryInProgress(queryId);
+    if(queryInProgress != null) {
+      queryInProgress.stopProgress();
+      synchronized(submittedQueries) {
+        submittedQueries.remove(queryId);
+      }
+
+      synchronized(runningQueries) {
+        runningQueries.remove(queryId);
+      }
+
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
+      long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+      if (executionTime < minExecutionTime.get()) {
+        minExecutionTime.set(executionTime);
+      }
+
+      if (executionTime > maxExecutionTime.get()) {
+        maxExecutionTime.set(executionTime);
+      }
+
+      long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+      if (totalExecutionTime > 0) {
+        avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+      } else {
+        avgExecutionTime.set(executionTime);
+      }
+      executedQuerySize.incrementAndGet();
+    } else {
+      LOG.warn("No QueryInProgress while query stopping: " + queryId);
+    }
+  }
+
+  public long getMinExecutionTime() {
+    if (getExecutedQuerySize() == 0) return 0;
+    return minExecutionTime.get();
+  }
+
+  public long getMaxExecutionTime() {
+    return maxExecutionTime.get();
+  }
+
+  public long getAvgExecutionTime() {
+    return avgExecutionTime.get();
+  }
+
+  public long getExecutedQuerySize() {
+    return executedQuerySize.get();
+  }
+
+  private void catchException(QueryId queryId, Exception e) {
+    LOG.error(e.getMessage(), e);
+    QueryInProgress queryInProgress = runningQueries.get(queryId);
+    queryInProgress.catchException(e);
+  }
+
+  public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+      QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+    if(queryInProgress == null) {
+      return null;
+    }
+
+    QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+    getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+    return null;
+  }
+
+  private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+    QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+    queryInfo.setQueryMaster(connectionInfo.getHost());
+    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    queryInfo.setQueryState(queryHeartbeat.getState());
+    queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+    if (queryHeartbeat.hasQueryFinishTime()) {
+      queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+    }
+
+    if (queryHeartbeat.hasResultDesc()) {
+      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+    }
+
+    return queryInfo;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7209080..2ffd7ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.master.container.TajoContainer;
@@ -177,23 +177,23 @@ public class TajoContainerProxy extends ContainerProxy {
       if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(conf));
           context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(conf));
           tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+      QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
-          TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+            QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())
               .addAllContainerIds(containerIdProtos)
               .build(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c054599..786025a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService {
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
-  private TajoMasterService tajoMasterService;
+  private QueryCoordinatorService tajoMasterService;
   private SessionManager sessionManager;
 
   private WorkerResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
 
-  private QueryJobManager queryJobManager;
+  private QueryManager queryManager;
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
@@ -183,13 +183,13 @@ public class TajoMaster extends CompositeService {
       globalEngine = new GlobalEngine(context);
       addIfService(globalEngine);
 
-      queryJobManager = new QueryJobManager(context);
-      addIfService(queryJobManager);
+      queryManager = new QueryManager(context);
+      addIfService(queryManager);
 
       tajoMasterClientService = new TajoMasterClientService(context);
       addIfService(tajoMasterClientService);
 
-      tajoMasterService = new TajoMasterService(context);
+      tajoMasterService = new QueryCoordinatorService(context);
       addIfService(tajoMasterService);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -441,8 +441,8 @@ public class TajoMaster extends CompositeService {
       return clock;
     }
 
-    public QueryJobManager getQueryJobManager() {
-      return queryJobManager;
+    public QueryManager getQueryJobManager() {
+      return queryManager;
     }
 
     public WorkerResourceManager getResourceManager() {
@@ -469,7 +469,7 @@ public class TajoMaster extends CompositeService {
       return storeManager;
     }
 
-    public TajoMasterService getTajoMasterService() {
+    public QueryCoordinatorService getTajoMasterService() {
       return tajoMasterService;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 93326be..fcc016a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.querymaster.QueryJobEvent;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -567,8 +566,8 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
 
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+        QueryManager queryManager = context.getQueryJobManager();
+        QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId);
 
         QueryInfo queryInfo = null;
         if (queryInProgress == null) {
@@ -598,8 +597,8 @@ public class TajoMasterClientService extends AbstractService {
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
-        QueryJobManager queryJobManager = context.getQueryJobManager();
-        queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+        QueryManager queryManager = context.getQueryJobManager();
+        queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
             new QueryInfo(queryId)));
         return BOOL_TRUE;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
deleted file mode 100644
index 02bdfa1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-
-public class TajoMasterService extends AbstractService {
-  private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
-
-  private final TajoMaster.MasterContext context;
-  private final TajoConf conf;
-  private final TajoMasterServiceHandler masterHandler;
-  private AsyncRpcServer server;
-  private InetSocketAddress bindAddress;
-
-  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
-  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
-  public TajoMasterService(TajoMaster.MasterContext context) {
-    super(TajoMasterService.class.getName());
-    this.context = context;
-    this.conf = context.getConf();
-    this.masterHandler = new TajoMasterServiceHandler();
-  }
-
-  @Override
-  public void start() {
-    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
-    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
-    try {
-      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    server.start();
-    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
-    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-        NetUtils.normalizeInetSocketAddress(bindAddress));
-    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    if(server != null) {
-      server.shutdown();
-      server = null;
-    }
-    super.stop();
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  public class TajoMasterServiceHandler
-      implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
-    @Override
-    public void heartbeat(
-        RpcController controller,
-        TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
-      }
-
-      TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
-
-      QueryJobManager queryJobManager = context.getQueryJobManager();
-      command = queryJobManager.queryHeartbeat(request);
-
-      TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
-      builder.setHeartbeatResult(BOOL_TRUE);
-      if(command != null) {
-        builder.setResponseCommand(command);
-      }
-
-      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
-      done.run(builder.build());
-    }
-
-    @Override
-    public void allocateWorkerResources(
-        RpcController controller,
-        TajoMasterProtocol.WorkerResourceAllocationRequest request,
-        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
-      context.getResourceManager().allocateWorkerResources(request, done);
-    }
-
-    @Override
-    public void releaseWorkerResource(RpcController controller,
-                                           TajoMasterProtocol.WorkerResourceReleaseRequest request,
-                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
-      for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
-        context.getResourceManager().releaseWorkerResource(eachContainer);
-      }
-      done.run(BOOL_TRUE);
-    }
-
-    @Override
-    public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
-                                     RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
-
-      TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
-          TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
-      Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
-      for(Worker worker: workers) {
-        WorkerResource resource = worker.getResource();
-
-        TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
-            TajoMasterProtocol.WorkerResourceProto.newBuilder();
-
-        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
-        workerResource.setMemoryMB(resource.getMemoryMB());
-        workerResource.setDiskSlots(resource.getDiskSlots());
-
-        builder.addWorkerResources(workerResource);
-      }
-      done.run(builder.build());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2fbebc1..0860d63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -388,10 +388,10 @@ public class QueryExecutor {
     context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
     hookManager.doHooks(queryContext, plan);
 
-    QueryJobManager queryJobManager = this.context.getQueryJobManager();
+    QueryManager queryManager = this.context.getQueryJobManager();
     QueryInfo queryInfo;
 
-    queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+    queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
 
     if(queryInfo == null) {
       responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 831ce43..519aa9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
@@ -36,8 +37,6 @@ import org.apache.tajo.util.ProtoUtil;
 import java.io.IOError;
 import java.net.InetSocketAddress;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
 import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
 
@@ -110,7 +109,8 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   /** The response builder */
-  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+  private static final TajoHeartbeatResponse.Builder builder =
+      TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
 
   private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
     return new WorkerStatusEvent(
@@ -204,7 +204,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
   }
 
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     int totalDiskSlots = 0;
     int totalCpuCoreSlots = 0;
     int totalMemoryMB = 0;
@@ -230,7 +230,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
       }
     }
 
-    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+    return ClusterResourceSummary.newBuilder()
         .setNumWorkers(rmContext.getWorkers().size())
         .setTotalCpuCoreSlots(totalCpuCoreSlots)
         .setTotalDiskSlots(totalDiskSlots)

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 9f2a3d5..e5cf66c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.QueryInProgress;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.util.ApplicationIdUtils;
 
@@ -49,8 +49,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.*;
-
 
 /**
  * It manages all resources of tajo workers.
@@ -162,7 +160,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
   }
 
   @Override
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+  public ClusterResourceSummary getClusterResourceSummary() {
     return resourceTracker.getClusterResourceSummary();
   }
 
@@ -204,7 +202,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
     builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
     builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
-    builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+    builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
     builder.setNumContainers(1);
     return builder.build();
   }
@@ -358,10 +356,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
     int allocatedResources = 0;
 
-    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+    ResourceRequestPriority resourceRequestPriority
       = resourceRequest.request.getResourceRequestPriority();
 
-    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+    if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
       synchronized(rmContext) {
         List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
         Collections.shuffle(randomWorkers);

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 79ec0ac..662b699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -22,16 +22,15 @@ import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.service.Service;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
 /**
  * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
  * and release the allocated containers.
@@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service {
    * @return A allocated container resource
    */
   @Deprecated
-  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+  public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
 
   /**
    * Request one or more resource containers. You can set the number of containers and resource capabilities, such as
@@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service {
    * @param request Request description
    * @param rpcCallBack Callback function
    */
-  public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
-      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+  public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+      RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack);
 
   /**
    * Release a container
@@ -100,7 +99,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @return The overall summary of cluster resources
    */
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+  public ClusterResourceSummary getClusterResourceSummary();
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
index 02203a9..19493d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.scheduler;
 
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index a091ed5..6cb98eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master.scheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.master.QueryJobManager;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryManager;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler {
   private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
   private final Thread queryProcessor;
   private AtomicBoolean stopped = new AtomicBoolean();
-  private QueryJobManager manager;
+  private QueryManager manager;
   private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
 
-  public SimpleFifoScheduler(QueryJobManager manager) {
+  public SimpleFifoScheduler(QueryManager manager) {
     this.manager = manager;
     this.queryProcessor = new Thread(new QueryProcessor());
     this.queryProcessor.setName("Query Processor");

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
deleted file mode 100644
index f83f244..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.session.Session;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress {
-  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
-  private QueryId queryId;
-
-  private Session session;
-
-  private LogicalRootNode plan;
-
-  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private QueryInfo queryInfo;
-
-  private final TajoMaster.MasterContext masterContext;
-
-  private NettyClientBase queryMasterRpc;
-
-  private QueryMasterProtocolService queryMasterRpcClient;
-
-  public QueryInProgress(
-      TajoMaster.MasterContext masterContext,
-      Session session,
-      QueryContext queryContext,
-      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-
-    this.masterContext = masterContext;
-    this.session = session;
-    this.queryId = queryId;
-    this.plan = plan;
-
-    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
-    queryInfo.setStartTime(System.currentTimeMillis());
-  }
-
-  public synchronized void kill() {
-    getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
-    if(queryMasterRpcClient != null){
-      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
-    }
-  }
-
-  public void stopProgress() {
-    if(stopped.getAndSet(true)) {
-      return;
-    }
-
-    LOG.info("=========================================================");
-    LOG.info("Stop query:" + queryId);
-
-    masterContext.getResourceManager().releaseQueryMaster(queryId);
-
-    if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
-    }
-
-    masterContext.getHistoryWriter().appendHistory(queryInfo);
-  }
-
-  public boolean startQueryMaster() {
-    try {
-      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
-      WorkerResourceManager resourceManager = masterContext.getResourceManager();
-      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
-      // if no resource to allocate a query master
-      if(resource == null) {
-        LOG.info("No Available Resources for QueryMaster");
-        return false;
-      }
-
-      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
-      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
-      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
-      return true;
-    } catch (Exception e) {
-      catchException(e);
-      return false;
-    }
-  }
-
-  private void connectQueryMaster() throws Exception {
-    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
-    LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
-    queryMasterRpcClient = queryMasterRpc.getStub();
-  }
-
-  public synchronized void submmitQueryToMaster() {
-    if(querySubmitted.get()) {
-      return;
-    }
-
-    try {
-      if(queryMasterRpcClient == null) {
-        connectQueryMaster();
-      }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster conneciton info.");
-        //TODO wait
-        return;
-      }
-      LOG.info("Call executeQuery to :" +
-          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
-      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
-      builder.setQueryId(queryId.getProto())
-          .setQueryContext(queryInfo.getQueryContext().getProto())
-          .setSession(session.getProto())
-          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
-          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
-      querySubmitted.set(true);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  public void catchException(Exception e) {
-    LOG.error(e.getMessage(), e);
-    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
-    queryInfo.setLastMessage(StringUtils.stringifyException(e));
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
-  public QueryInfo getQueryInfo() {
-    return this.queryInfo;
-  }
-
-  public boolean isStarted() {
-    return !stopped.get() && this.querySubmitted.get();
-  }
-
-  public void heartbeat(QueryInfo queryInfo) {
-    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
-    // to avoid partial update by different heartbeats
-    synchronized (this.queryInfo) {
-
-      // terminal state will let client to retrieve a query result
-      // So, we must set the query result before changing query state
-      if (isFinishState(queryInfo.getQueryState())) {
-        if (queryInfo.hasResultdesc()) {
-          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
-        }
-      }
-
-      this.queryInfo.setQueryState(queryInfo.getQueryState());
-      this.queryInfo.setProgress(queryInfo.getProgress());
-      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
-      // Update diagnosis message
-      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
-        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-        LOG.info(queryId + queryInfo.getLastMessage());
-      }
-
-      // if any error occurs, print outs the error message
-      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
-      }
-
-
-      if (isFinishState(this.queryInfo.getQueryState())) {
-        masterContext.getQueryJobManager().getEventHandler().handle(
-            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
-      }
-    }
-  }
-
-  private boolean isFinishState(TajoProtos.QueryState state) {
-    return state == TajoProtos.QueryState.QUERY_FAILED ||
-        state == TajoProtos.QueryState.QUERY_KILLED ||
-        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 02760a3..53390a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -34,7 +34,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.event.QueryStartEvent;
 import org.apache.tajo.master.event.QueryStopEvent;
@@ -56,10 +57,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
 public class QueryMaster extends CompositeService implements EventHandler {
   private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
 
@@ -182,12 +179,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
     LOG.info("cleanup executionBlocks: " + cleanupMessage);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
     TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
     builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
     TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -206,9 +203,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
   private void cleanup(QueryId queryId) {
     LOG.info("cleanup query resources : " + queryId);
     NettyClientBase rpc = null;
-    List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+    List<WorkerResourceProto> workers = getAllWorker();
 
-    for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+    for (WorkerResourceProto worker : workers) {
       try {
         TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
         rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -224,7 +221,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
   }
 
-  public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+  public List<WorkerResourceProto> getAllWorker() {
 
     NettyClientBase rpc = null;
     try {
@@ -235,78 +232,34 @@ public class QueryMaster extends CompositeService implements EventHandler {
       if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
         try {
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         } catch (Exception e) {
           queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
               HAServiceUtil.getResourceTrackerAddress(systemConf));
           queryMasterContext.getWorkerContext().setTajoMasterAddress(
               HAServiceUtil.getMasterUmbilicalAddress(systemConf));
           rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
       } else {
         rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
       }
 
-      TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+      QueryCoordinatorProtocolService masterService = rpc.getStub();
 
-      CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
-          new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+      CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
       masterService.getAllWorkerResource(callBack.getController(),
           PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
 
-      TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+      WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
       return workerResourcesRequest.getWorkerResourcesList();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(rpc);
     }
-    return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-  }
-
-  public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
-    LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
-    NettyClientBase tmClient = null;
-    try {
-      // In TajoMaster HA mode, if backup master be active status,
-      // worker may fail to connect existing active master. Thus,
-      // if worker can't connect the master, worker should try to connect another master and
-      // update master address in worker context.
-      if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        try {
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        } catch (Exception e) {
-          queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
-              HAServiceUtil.getResourceTrackerAddress(systemConf));
-          queryMasterContext.getWorkerContext().setTajoMasterAddress(
-              HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-          tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
-        }
-      } else {
-        tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
-      }
-
-      TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
-      TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
-          .setState(state)
-          .setQueryId(queryId.getProto());
-
-      CallFuture<TajoHeartbeatResponse> callBack =
-          new CallFuture<TajoHeartbeatResponse>();
-
-      masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
+    return new ArrayList<WorkerResourceProto>();
   }
 
   @Override
@@ -407,19 +360,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
         if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
           try {
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
             queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
             tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+                QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
       }  catch (Exception e) {
         //this function will be closed in new thread.
@@ -524,24 +477,24 @@ public class QueryMaster extends CompositeService implements EventHandler {
               if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
                 try {
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 } catch (Exception e) {
                   queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
                       HAServiceUtil.getResourceTrackerAddress(systemConf));
                   queryMasterContext.getWorkerContext().setTajoMasterAddress(
                       HAServiceUtil.getMasterUmbilicalAddress(systemConf));
                   tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                      TajoMasterProtocol.class, true);
+                      QueryCoordinatorProtocol.class, true);
                 }
               } else {
                 tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
-                    TajoMasterProtocol.class, true);
+                    QueryCoordinatorProtocol.class, true);
               }
 
-              TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+              QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
 
-              CallFuture<TajoHeartbeatResponse> callBack =
-                  new CallFuture<TajoHeartbeatResponse>();
+              CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
+                  new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
 
               TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
               masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);