You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:22 UTC

[6/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
     }
 
     /**
-     * Getting the total memory of cluster
-     *
-     * @param stage
-     * @return mega bytes
-     */
-    private static int getClusterTotalMemory(Stage stage) {
-      List<TajoMasterProtocol.WorkerResourceProto> workers =
-          stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
-      int totalMem = 0;
-      for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
-        totalMem += worker.getMemoryMB();
-      }
-      return totalMem;
-    }
-    /**
      * Getting the desire number of partitions according to the volume of input data.
      * This method is only used to determine the partition key number of hash join or aggregation.
      *

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Task;
 import org.apache.tajo.querymaster.Stage;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
                                            int memoryMBPerTask) {
     //TODO consider disk slot
 
-    TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+    ClusterResourceSummary clusterResource = workerContext.getClusterResource();
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
     LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void run() {
       LOG.info("Start TajoWorkerAllocationThread");
-      CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
-        new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+      CallFuture<WorkerResourceAllocationResponse> callBack =
+        new CallFuture<WorkerResourceAllocationResponse>();
 
       //TODO consider task's resource usage pattern
       int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
       float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
 
-      TajoMasterProtocol.WorkerResourceAllocationRequest request =
-        TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
           .setMinMemoryMBPerContainer(requiredMemoryMB)
           .setMaxMemoryMBPerContainer(requiredMemoryMB)
           .setNumContainers(event.getRequiredNum())
-          .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
-            : TajoMasterProtocol.ResourceRequestPriority.DISK)
+          .setResourceRequestPriority(!event.isLeafQuery() ?
+              ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
           .setMinDiskSlotPerContainer(requiredDiskSlots)
           .setMaxDiskSlotPerContainer(requiredDiskSlots)
           .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           try {
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           } catch (Exception e) {
             queryTaskContext.getQueryMasterContext().getWorkerContext().
               setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
               setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
             tmClient = connPool.getConnection(
               queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+              QueryCoordinatorProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(
             queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-            TajoMasterProtocol.class, true);
+            QueryCoordinatorProtocol.class, true);
         }
 
-        TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         connPool.releaseConnection(tmClient);
       }
 
-      TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+      WorkerResourceAllocationResponse response = null;
       while(!stopped.get()) {
         try {
           response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       int numAllocatedContainers = 0;
 
       if(response != null) {
-        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+        List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
         List<TajoContainer> containers = new ArrayList<TajoContainer>();
-        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+        for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
             eachAllocatedResource.getConnectionInfo().getPeerRpcPort());

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import com.codahale.metrics.Gauge;
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
 
   private AtomicInteger numClusterNodes = new AtomicInteger();
 
-  private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+  private ClusterResourceSummary clusterResource;
 
   private WorkerConnectionInfo connectionInfo;
 
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
       return TajoWorker.this.numClusterNodes.get();
     }
 
-    public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+    public void setClusterResource(ClusterResourceSummary clusterResource) {
       synchronized (numClusterNodes) {
         TajoWorker.this.clusterResource = clusterResource;
       }
     }
 
-    public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+    public ClusterResourceSummary getClusterResource() {
       synchronized (numClusterNodes) {
         return TajoWorker.this.clusterResource;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
 import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
 
 import java.io.File;
 import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
 
   class WorkerHeartbeatThread extends Thread {
     private volatile AtomicBoolean stopped = new AtomicBoolean(false);
-    TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+    ServerStatusProto.System systemInfo;
+    List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
     float workerDiskSlots;
     int workerMemoryMB;
     List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
         }
       }
 
-      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+      systemInfo = ServerStatusProto.System.newBuilder()
           .setAvailableProcessors(workerCpuCoreNum)
           .setFreeMemoryMB(0)
           .setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
           getDiskUsageInfos();
         }
-        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
-            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+        ServerStatusProto.JvmHeap jvmHeap =
+            ServerStatusProto.JvmHeap.newBuilder()
                 .setMaxHeap(Runtime.getRuntime().maxMemory())
                 .setFreeHeap(Runtime.getRuntime().freeMemory())
                 .setTotalHeap(Runtime.getRuntime().totalMemory())
                 .build();
 
-        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+        ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
             .addAllDisk(diskInfos)
             .setRunningTaskNum(
                 context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
 
         NettyClientBase rmClient = null;
         try {
-          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
-              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+          CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
 
           // In TajoMaster HA mode, if backup master be active status,
           // worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
           TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
           resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
 
-          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
           if(response != null) {
-            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
             if(clusterResourceSummary.getNumWorkers() > 0) {
               context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
             }
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
         if(mountInfos != null) {
           for(DiskMountInfo eachMount: mountInfos) {
             File eachFile = new File(eachMount.getMountPath());
-            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+            diskInfos.add(ServerStatusProto.Disk.newBuilder()
                 .setAbsolutePath(eachFile.getAbsolutePath())
                 .setTotalSpace(eachFile.getTotalSpace())
                 .setFreeSpace(eachFile.getFreeSpace())

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
 
 package org.apache.tajo.worker.rule;
 
-import java.net.InetSocketAddress;
-
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
 import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
+import java.net.InetSocketAddress;
+
 /**
  * With this rule, Tajo worker will check the connectivity to tajo master server.
  */
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
       } else {
         masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
       }
-      masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+      masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
       
       masterClient.getStub();
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+    message System {
+        required int32 availableProcessors = 1;
+        required int32 freeMemoryMB = 2;
+        required int32 maxMemoryMB = 3;
+        required int32 totalMemoryMB = 4;
+    }
+    message Disk {
+        required string absolutePath = 1;
+        required int64 totalSpace = 2;
+        required int64 freeSpace = 3;
+        required int64 usableSpace = 4;
+    }
+
+    message JvmHeap {
+        required int64 maxHeap = 1;
+        required int64 totalHeap = 2;
+        required int64 freeHeap = 3;
+    }
+
+    required System system = 1;
+    required float diskSlots = 2;
+    required int32 memoryResourceMB = 3;
+    repeated Disk disk = 4;
+    required int32 runningTaskNum = 5;
+    required JvmHeap jvmHeap = 6;
+    required BoolProto queryMasterMode = 7;
+    required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  optional QueryIdProto queryId = 2;
+  optional QueryState state = 3;
+  optional TableDescProto resultDesc = 4;
+  optional string statusMessage = 5;
+  optional float queryProgress = 6;
+  optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+  message ResponseCommand {
+      required string command = 1;
+      repeated string params = 2;
+  }
+  required BoolProto heartbeatResult = 1;
+  required ClusterResourceSummary clusterResourceSummary = 2;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+  required int32 numWorkers = 1;
+  required int32 totalDiskSlots = 2;
+  required int32 totalCpuCoreSlots = 3;
+  required int32 totalMemoryMB = 4;
+
+  required int32 totalAvailableDiskSlots = 5;
+  required int32 totalAvailableCpuCoreSlots = 6;
+  required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+    MEMORY = 1;
+    DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+    required QueryIdProto queryId = 1;
+    required ResourceRequestPriority resourceRequestPriority = 2;
+
+    required int32 numContainers = 3;
+
+    required int32 maxMemoryMBPerContainer = 4;
+    required int32 minMemoryMBPerContainer = 5;
+
+    required float maxDiskSlotPerContainer = 6;
+    required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+    required WorkerConnectionInfoProto connectionInfo = 1;
+    required int32 memoryMB = 2 ;
+    required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+    repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+    required TajoContainerIdProto containerId = 1;
+    required WorkerConnectionInfoProto connectionInfo = 2;
+
+    required int32 allocatedMemoryMB = 3;
+    required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+    required QueryIdProto queryId = 1;
+    repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
 import "ContainerProtocol.proto";
 import "tajo_protos.proto";
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
-    message System {
-        required int32 availableProcessors = 1;
-        required int32 freeMemoryMB = 2;
-        required int32 maxMemoryMB = 3;
-        required int32 totalMemoryMB = 4;
-    }
-    message Disk {
-        required string absolutePath = 1;
-        required int64 totalSpace = 2;
-        required int64 freeSpace = 3;
-        required int64 usableSpace = 4;
-    }
-
-    message JvmHeap {
-        required int64 maxHeap = 1;
-        required int64 totalHeap = 2;
-        required int64 freeHeap = 3;
-    }
-
-    required System system = 1;
-    required float diskSlots = 2;
-    required int32 memoryResourceMB = 3;
-    repeated Disk disk = 4;
-    required int32 runningTaskNum = 5;
-    required JvmHeap jvmHeap = 6;
-    required BoolProto queryMasterMode = 7;
-    required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
-  required WorkerConnectionInfoProto connectionInfo = 1;
-  optional QueryIdProto queryId = 2;
-  optional QueryState state = 3;
-  optional TableDescProto resultDesc = 4;
-  optional string statusMessage = 5;
-  optional float queryProgress = 6;
-  optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
-  message ResponseCommand {
-      required string command = 1;
-      repeated string params = 2;
-  }
-  required BoolProto heartbeatResult = 1;
-  required ClusterResourceSummary clusterResourceSummary = 2;
-  optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
-  required int32 numWorkers = 1;
-  required int32 totalDiskSlots = 2;
-  required int32 totalCpuCoreSlots = 3;
-  required int32 totalMemoryMB = 4;
-
-  required int32 totalAvailableDiskSlots = 5;
-  required int32 totalAvailableCpuCoreSlots = 6;
-  required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
-    MEMORY = 1;
-    DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
-    required QueryIdProto queryId = 1;
-    required ResourceRequestPriority resourceRequestPriority = 2;
-
-    required int32 numContainers = 3;
-
-    required int32 maxMemoryMBPerContainer = 4;
-    required int32 minMemoryMBPerContainer = 5;
-
-    required float maxDiskSlotPerContainer = 6;
-    required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
-    required WorkerConnectionInfoProto connectionInfo = 1;
-    required int32 memoryMB = 2 ;
-    required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
-    repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
-    required ExecutionBlockIdProto executionBlockId = 1;
-    repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
-    required TajoContainerIdProto containerId = 1;
-    required WorkerConnectionInfoProto connectionInfo = 2;
-
-    required int32 allocatedMemoryMB = 3;
-    required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
-    required QueryIdProto queryId = 1;
-    repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
-  rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
-  rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
-  rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
-  rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
 <%@ page import="org.apache.tajo.ha.HAService" %>
 <%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.StringUtils" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
 import org.apache.tajo.querymaster.Query;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;

http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
 package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;