You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/09 15:34:22 UTC
[6/8] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to
QueryCoordinatorProtocol.
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
}
/**
- * Getting the total memory of cluster
- *
- * @param stage
- * @return mega bytes
- */
- private static int getClusterTotalMemory(Stage stage) {
- List<TajoMasterProtocol.WorkerResourceProto> workers =
- stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
- int totalMem = 0;
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- totalMem += worker.getMemoryMB();
- }
- return totalMem;
- }
- /**
* Getting the desire number of partitions according to the volume of input data.
* This method is only used to determine the partition key number of hash join or aggregation.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Stage;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
import java.net.InetSocketAddress;
import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int memoryMBPerTask) {
//TODO consider disk slot
- TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+ ClusterResourceSummary clusterResource = workerContext.getClusterResource();
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
- CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+ CallFuture<WorkerResourceAllocationResponse> callBack =
+ new CallFuture<WorkerResourceAllocationResponse>();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
- TajoMasterProtocol.WorkerResourceAllocationRequest request =
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setMinMemoryMBPerContainer(requiredMemoryMB)
.setMaxMemoryMBPerContainer(requiredMemoryMB)
.setNumContainers(event.getRequiredNum())
- .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
- : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setResourceRequestPriority(!event.isLeafQuery() ?
+ ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
.setMinDiskSlotPerContainer(requiredDiskSlots)
.setMaxDiskSlotPerContainer(requiredDiskSlots)
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
try {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryTaskContext.getQueryMasterContext().getWorkerContext().
setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
connPool.releaseConnection(tmClient);
}
- TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+ WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int numAllocatedContainers = 0;
if(response != null) {
- List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+ List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<TajoContainer> containers = new ArrayList<TajoContainer>();
- for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+ for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterNodes = new AtomicInteger();
- private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+ private ClusterResourceSummary clusterResource;
private WorkerConnectionInfo connectionInfo;
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
return TajoWorker.this.numClusterNodes.get();
}
- public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+ public void setClusterResource(ClusterResourceSummary clusterResource) {
synchronized (numClusterNodes) {
TajoWorker.this.clusterResource = clusterResource;
}
}
- public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+ public ClusterResourceSummary getClusterResource() {
synchronized (numClusterNodes) {
return TajoWorker.this.clusterResource;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.DiskDeviceInfo;
import org.apache.tajo.storage.DiskMountInfo;
import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
import java.io.File;
import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
- TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+ ServerStatusProto.System systemInfo;
+ List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
}
}
- systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+ systemInfo = ServerStatusProto.System.newBuilder()
.setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
- TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
- TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ ServerStatusProto.JvmHeap jvmHeap =
+ ServerStatusProto.JvmHeap.newBuilder()
.setMaxHeap(Runtime.getRuntime().maxMemory())
.setFreeHeap(Runtime.getRuntime().freeMemory())
.setTotalHeap(Runtime.getRuntime().totalMemory())
.build();
- TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+ ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
.setRunningTaskNum(
context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
NettyClientBase rmClient = null;
try {
- CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
- new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+ CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
// In TajoMaster HA mode, if backup master be active status,
// worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
- TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
- TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
if(clusterResourceSummary.getNumWorkers() > 0) {
context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
if(mountInfos != null) {
for(DiskMountInfo eachMount: mountInfos) {
File eachFile = new File(eachMount.getMountPath());
- diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ diskInfos.add(ServerStatusProto.Disk.newBuilder()
.setAbsolutePath(eachFile.getAbsolutePath())
.setTotalSpace(eachFile.getTotalSpace())
.setFreeSpace(eachFile.getFreeSpace())
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
package org.apache.tajo.worker.rule;
-import java.net.InetSocketAddress;
-
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
+import java.net.InetSocketAddress;
+
/**
* With this rule, Tajo worker will check the connectivity to tajo master server.
*/
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
} else {
masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
}
- masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+ masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
masterClient.getStub();
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int32 freeMemoryMB = 2;
+ required int32 maxMemoryMB = 3;
+ required int32 totalMemoryMB = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
+ required System system = 1;
+ required float diskSlots = 2;
+ required int32 memoryResourceMB = 3;
+ repeated Disk disk = 4;
+ required int32 runningTaskNum = 5;
+ required JvmHeap jvmHeap = 6;
+ required BoolProto queryMasterMode = 7;
+ required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional QueryIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional TableDescProto resultDesc = 4;
+ optional string statusMessage = 5;
+ optional float queryProgress = 6;
+ optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ required ClusterResourceSummary clusterResourceSummary = 2;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+ required int32 numWorkers = 1;
+ required int32 totalDiskSlots = 2;
+ required int32 totalCpuCoreSlots = 3;
+ required int32 totalMemoryMB = 4;
+
+ required int32 totalAvailableDiskSlots = 5;
+ required int32 totalAvailableCpuCoreSlots = 6;
+ required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+ MEMORY = 1;
+ DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+ required QueryIdProto queryId = 1;
+ required ResourceRequestPriority resourceRequestPriority = 2;
+
+ required int32 numContainers = 3;
+
+ required int32 maxMemoryMBPerContainer = 4;
+ required int32 minMemoryMBPerContainer = 5;
+
+ required float maxDiskSlotPerContainer = 6;
+ required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required int32 memoryMB = 2 ;
+ required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+ repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+ required TajoContainerIdProto containerId = 1;
+ required WorkerConnectionInfoProto connectionInfo = 2;
+
+ required int32 allocatedMemoryMB = 3;
+ required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+ required QueryIdProto queryId = 1;
+ repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+ rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+ rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+ rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+ rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
import "ContainerProtocol.proto";
import "tajo_protos.proto";
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
- message System {
- required int32 availableProcessors = 1;
- required int32 freeMemoryMB = 2;
- required int32 maxMemoryMB = 3;
- required int32 totalMemoryMB = 4;
- }
- message Disk {
- required string absolutePath = 1;
- required int64 totalSpace = 2;
- required int64 freeSpace = 3;
- required int64 usableSpace = 4;
- }
-
- message JvmHeap {
- required int64 maxHeap = 1;
- required int64 totalHeap = 2;
- required int64 freeHeap = 3;
- }
-
- required System system = 1;
- required float diskSlots = 2;
- required int32 memoryResourceMB = 3;
- repeated Disk disk = 4;
- required int32 runningTaskNum = 5;
- required JvmHeap jvmHeap = 6;
- required BoolProto queryMasterMode = 7;
- required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
- required WorkerConnectionInfoProto connectionInfo = 1;
- optional QueryIdProto queryId = 2;
- optional QueryState state = 3;
- optional TableDescProto resultDesc = 4;
- optional string statusMessage = 5;
- optional float queryProgress = 6;
- optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
- message ResponseCommand {
- required string command = 1;
- repeated string params = 2;
- }
- required BoolProto heartbeatResult = 1;
- required ClusterResourceSummary clusterResourceSummary = 2;
- optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
- required int32 numWorkers = 1;
- required int32 totalDiskSlots = 2;
- required int32 totalCpuCoreSlots = 3;
- required int32 totalMemoryMB = 4;
-
- required int32 totalAvailableDiskSlots = 5;
- required int32 totalAvailableCpuCoreSlots = 6;
- required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
- MEMORY = 1;
- DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
- required QueryIdProto queryId = 1;
- required ResourceRequestPriority resourceRequestPriority = 2;
-
- required int32 numContainers = 3;
-
- required int32 maxMemoryMBPerContainer = 4;
- required int32 minMemoryMBPerContainer = 5;
-
- required float maxDiskSlotPerContainer = 6;
- required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
- required WorkerConnectionInfoProto connectionInfo = 1;
- required int32 memoryMB = 2 ;
- required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
- repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
- required ExecutionBlockIdProto executionBlockId = 1;
- repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
- required TajoContainerIdProto containerId = 1;
- required WorkerConnectionInfoProto connectionInfo = 2;
-
- required int32 allocatedMemoryMB = 3;
- required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
- required QueryIdProto queryId = 1;
- repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
- rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
- rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
- rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
- rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.ha.HAService" %>
<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.StringUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
import org.apache.tajo.querymaster.Query;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;