You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/09 12:46:50 UTC
[1/2] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to
QueryCoordinatorProtocol.
Repository: tajo
Updated Branches:
refs/heads/master 50a8a663c -> 807868bd4
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 1ea7051..13394f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
@@ -838,22 +837,6 @@ public class Stage implements EventHandler<StageEvent> {
}
/**
- * Getting the total memory of cluster
- *
- * @param stage
- * @return mega bytes
- */
- private static int getClusterTotalMemory(Stage stage) {
- List<TajoMasterProtocol.WorkerResourceProto> workers =
- stage.context.getQueryMasterContext().getQueryMaster().getAllWorker();
-
- int totalMem = 0;
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
- totalMem += worker.getMemoryMB();
- }
- return totalMem;
- }
- /**
* Getting the desire number of partitions according to the volume of input data.
* This method is only used to determine the partition key number of hash join or aggregation.
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
index d711258..82fb37f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.ha.HAService;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.querymaster.Stage;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 8241478..04b65d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
@@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.StageContainerAllocationEvent;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.ha.HAServiceUtil;
import java.net.InetSocketAddress;
import java.util.*;
@@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int memoryMBPerTask) {
//TODO consider disk slot
- TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+ ClusterResourceSummary clusterResource = workerContext.getClusterResource();
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
@@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
- CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+ CallFuture<WorkerResourceAllocationResponse> callBack =
+ new CallFuture<WorkerResourceAllocationResponse>();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
- TajoMasterProtocol.WorkerResourceAllocationRequest request =
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
.setMinMemoryMBPerContainer(requiredMemoryMB)
.setMaxMemoryMBPerContainer(requiredMemoryMB)
.setNumContainers(event.getRequiredNum())
- .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
- : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setResourceRequestPriority(!event.isLeafQuery() ?
+ ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
.setMinDiskSlotPerContainer(requiredDiskSlots)
.setMaxDiskSlotPerContainer(requiredDiskSlots)
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
@@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
try {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryTaskContext.getQueryMasterContext().getWorkerContext().
setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
@@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
connPool.releaseConnection(tmClient);
}
- TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+ WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
@@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int numAllocatedContainers = 0;
if(response != null) {
- List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
+ List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<TajoContainer> containers = new ArrayList<TajoContainer>();
- for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
+ for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 09a87e0..4003014 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
import com.codahale.metrics.Gauge;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.ha.TajoMasterInfo;
-import org.apache.tajo.querymaster.QueryMaster;
-import org.apache.tajo.querymaster.QueryMasterManagerService;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.querymaster.QueryMaster;
+import org.apache.tajo.querymaster.QueryMasterManagerService;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rule.EvaluationContext;
@@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.util.*;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.JvmPauseMonitor;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.StringUtils;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
import org.apache.tajo.util.metrics.TajoSystemMetrics;
@@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService {
private AtomicInteger numClusterNodes = new AtomicInteger();
- private TajoMasterProtocol.ClusterResourceSummary clusterResource;
+ private ClusterResourceSummary clusterResource;
private WorkerConnectionInfo connectionInfo;
@@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService {
return TajoWorker.this.numClusterNodes.get();
}
- public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+ public void setClusterResource(ClusterResourceSummary clusterResource) {
synchronized (numClusterNodes) {
TajoWorker.this.clusterResource = clusterResource;
}
}
- public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+ public ClusterResourceSummary getClusterResource() {
synchronized (numClusterNodes) {
return TajoWorker.this.clusterResource;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index c809921..b92c4cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ha.HAServiceUtil;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
@@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.storage.DiskDeviceInfo;
import org.apache.tajo.storage.DiskMountInfo;
import org.apache.tajo.storage.DiskUtil;
-import org.apache.tajo.ha.HAServiceUtil;
import java.io.File;
import java.util.List;
@@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService {
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
- TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
+ ServerStatusProto.System systemInfo;
+ List<ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
@@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService {
}
}
- systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+ systemInfo = ServerStatusProto.System.newBuilder()
.setAvailableProcessors(workerCpuCoreNum)
.setFreeMemoryMB(0)
.setMaxMemoryMB(0)
@@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
- TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
- TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ ServerStatusProto.JvmHeap jvmHeap =
+ ServerStatusProto.JvmHeap.newBuilder()
.setMaxHeap(Runtime.getRuntime().maxMemory())
.setFreeHeap(Runtime.getRuntime().freeMemory())
.setTotalHeap(Runtime.getRuntime().totalMemory())
.build();
- TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+ ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
.setRunningTaskNum(
context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
@@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService {
NettyClientBase rmClient = null;
try {
- CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
- new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+ CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>();
// In TajoMaster HA mode, if backup master be active status,
// worker may fail to connect existing active master. Thus,
@@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService {
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
- TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
- TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
if(clusterResourceSummary.getNumWorkers() > 0) {
context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
@@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService {
if(mountInfos != null) {
for(DiskMountInfo eachMount: mountInfos) {
File eachFile = new File(eachMount.getMountPath());
- diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ diskInfos.add(ServerStatusProto.Disk.newBuilder()
.setAbsolutePath(eachFile.getAbsolutePath())
.setTotalSpace(eachFile.getTotalSpace())
.setFreeSpace(eachFile.getFreeSpace())
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
index 6eb710a..68890e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java
@@ -18,23 +18,19 @@
package org.apache.tajo.worker.rule;
-import java.net.InetSocketAddress;
-
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rule.EvaluationContext;
-import org.apache.tajo.rule.EvaluationResult;
-import org.apache.tajo.rule.SelfDiagnosisRuleDefinition;
-import org.apache.tajo.rule.SelfDiagnosisRuleVisibility;
+import org.apache.tajo.rule.*;
import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode;
-import org.apache.tajo.rule.SelfDiagnosisRule;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.TajoWorker;
+import java.net.InetSocketAddress;
+
/**
* With this rule, Tajo worker will check the connectivity to tajo master server.
*/
@@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule {
} else {
masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
}
- masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true);
+ masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true);
masterClient.getStub();
} finally {
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
new file mode 100644
index 0000000..41a382f
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryCoordinatorProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int32 freeMemoryMB = 2;
+ required int32 maxMemoryMB = 3;
+ required int32 totalMemoryMB = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
+ required System system = 1;
+ required float diskSlots = 2;
+ required int32 memoryResourceMB = 3;
+ repeated Disk disk = 4;
+ required int32 runningTaskNum = 5;
+ required JvmHeap jvmHeap = 6;
+ required BoolProto queryMasterMode = 7;
+ required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional QueryIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional TableDescProto resultDesc = 4;
+ optional string statusMessage = 5;
+ optional float queryProgress = 6;
+ optional int64 queryFinishTime = 7;
+}
+
+message TajoHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ required ClusterResourceSummary clusterResourceSummary = 2;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+ required int32 numWorkers = 1;
+ required int32 totalDiskSlots = 2;
+ required int32 totalCpuCoreSlots = 3;
+ required int32 totalMemoryMB = 4;
+
+ required int32 totalAvailableDiskSlots = 5;
+ required int32 totalAvailableCpuCoreSlots = 6;
+ required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+ MEMORY = 1;
+ DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+ required QueryIdProto queryId = 1;
+ required ResourceRequestPriority resourceRequestPriority = 2;
+
+ required int32 numContainers = 3;
+
+ required int32 maxMemoryMBPerContainer = 4;
+ required int32 minMemoryMBPerContainer = 5;
+
+ required float maxDiskSlotPerContainer = 6;
+ required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required int32 memoryMB = 2 ;
+ required float diskSlots = 3;
+}
+
+message WorkerResourcesRequest {
+ repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated TajoContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+ required TajoContainerIdProto containerId = 1;
+ required WorkerConnectionInfoProto connectionInfo = 2;
+
+ required int32 allocatedMemoryMB = 3;
+ required float allocatedDiskSlots = 4;
+}
+
+message WorkerResourceAllocationResponse {
+ required QueryIdProto queryId = 1;
+ repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service QueryCoordinatorProtocolService {
+ rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+ rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+ rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+ rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b2db46a..40aeab7 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
-import "TajoMasterProtocol.proto";
+import "QueryCoordinatorProtocol.proto";
import "ContainerProtocol.proto";
import "tajo_protos.proto";
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
deleted file mode 100644
index bc73596..0000000
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-//TajoWorker -> TajoMaster protocol
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "TajoMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-import "ContainerProtocol.proto";
-
-package hadoop.yarn;
-
-message ServerStatusProto {
- message System {
- required int32 availableProcessors = 1;
- required int32 freeMemoryMB = 2;
- required int32 maxMemoryMB = 3;
- required int32 totalMemoryMB = 4;
- }
- message Disk {
- required string absolutePath = 1;
- required int64 totalSpace = 2;
- required int64 freeSpace = 3;
- required int64 usableSpace = 4;
- }
-
- message JvmHeap {
- required int64 maxHeap = 1;
- required int64 totalHeap = 2;
- required int64 freeHeap = 3;
- }
-
- required System system = 1;
- required float diskSlots = 2;
- required int32 memoryResourceMB = 3;
- repeated Disk disk = 4;
- required int32 runningTaskNum = 5;
- required JvmHeap jvmHeap = 6;
- required BoolProto queryMasterMode = 7;
- required BoolProto taskRunnerMode = 8;
-}
-
-message TajoHeartbeat {
- required WorkerConnectionInfoProto connectionInfo = 1;
- optional QueryIdProto queryId = 2;
- optional QueryState state = 3;
- optional TableDescProto resultDesc = 4;
- optional string statusMessage = 5;
- optional float queryProgress = 6;
- optional int64 queryFinishTime = 7;
-}
-
-message TajoHeartbeatResponse {
- message ResponseCommand {
- required string command = 1;
- repeated string params = 2;
- }
- required BoolProto heartbeatResult = 1;
- required ClusterResourceSummary clusterResourceSummary = 2;
- optional ResponseCommand responseCommand = 3;
-}
-
-message ClusterResourceSummary {
- required int32 numWorkers = 1;
- required int32 totalDiskSlots = 2;
- required int32 totalCpuCoreSlots = 3;
- required int32 totalMemoryMB = 4;
-
- required int32 totalAvailableDiskSlots = 5;
- required int32 totalAvailableCpuCoreSlots = 6;
- required int32 totalAvailableMemoryMB = 7;
-}
-
-enum ResourceRequestPriority {
- MEMORY = 1;
- DISK = 2;
-}
-
-message WorkerResourceAllocationRequest {
- required QueryIdProto queryId = 1;
- required ResourceRequestPriority resourceRequestPriority = 2;
-
- required int32 numContainers = 3;
-
- required int32 maxMemoryMBPerContainer = 4;
- required int32 minMemoryMBPerContainer = 5;
-
- required float maxDiskSlotPerContainer = 6;
- required float minDiskSlotPerContainer = 7;
-}
-
-message WorkerResourceProto {
- required WorkerConnectionInfoProto connectionInfo = 1;
- required int32 memoryMB = 2 ;
- required float diskSlots = 3;
-}
-
-message WorkerResourcesRequest {
- repeated WorkerResourceProto workerResources = 1;
-}
-
-message WorkerResourceReleaseRequest {
- required ExecutionBlockIdProto executionBlockId = 1;
- repeated TajoContainerIdProto containerIds = 2;
-}
-
-message WorkerAllocatedResource {
- required TajoContainerIdProto containerId = 1;
- required WorkerConnectionInfoProto connectionInfo = 2;
-
- required int32 allocatedMemoryMB = 3;
- required float allocatedDiskSlots = 4;
-}
-
-message WorkerResourceAllocationResponse {
- required QueryIdProto queryId = 1;
- repeated WorkerAllocatedResource workerAllocatedResource = 2;
-}
-
-service TajoMasterProtocolService {
- rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
- rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
- rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
- rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 00186d7..0defb3c 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -25,7 +25,7 @@
<%@ page import="org.apache.tajo.master.TajoMaster" %>
<%@ page import="org.apache.tajo.ha.HAService" %>
<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 4d8e5e6..85f7176 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -20,7 +20,7 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %>
+<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.StringUtils" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0786912..e548b81 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.*;
import org.apache.tajo.querymaster.Query;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index b8fbd67..a013d0b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -19,12 +19,11 @@
package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
[2/2] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to
QueryCoordinatorProtocol.
Posted by hy...@apache.org.
TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
Closes #342
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/807868bd
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/807868bd
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/807868bd
Branch: refs/heads/master
Commit: 807868bd4d1ca1c8bd3aee33d31cca3d43dd2273
Parents: 50a8a66
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 9 20:44:21 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 9 20:44:21 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
tajo-core/pom.xml | 2 +-
.../tajo/master/QueryCoordinatorService.java | 160 ++++++++++
.../org/apache/tajo/master/QueryInProgress.java | 228 +++++++++++++
.../org/apache/tajo/master/QueryJobManager.java | 316 -------------------
.../org/apache/tajo/master/QueryManager.java | 315 ++++++++++++++++++
.../apache/tajo/master/TajoContainerProxy.java | 12 +-
.../java/org/apache/tajo/master/TajoMaster.java | 16 +-
.../tajo/master/TajoMasterClientService.java | 9 +-
.../apache/tajo/master/TajoMasterService.java | 161 ----------
.../apache/tajo/master/exec/QueryExecutor.java | 4 +-
.../tajo/master/rm/TajoResourceTracker.java | 12 +-
.../master/rm/TajoWorkerResourceManager.java | 14 +-
.../tajo/master/rm/WorkerResourceManager.java | 17 +-
.../apache/tajo/master/scheduler/Scheduler.java | 2 +-
.../master/scheduler/SimpleFifoScheduler.java | 8 +-
.../tajo/querymaster/QueryInProgress.java | 230 --------------
.../apache/tajo/querymaster/QueryMaster.java | 95 ++----
.../java/org/apache/tajo/querymaster/Stage.java | 17 -
.../main/java/org/apache/tajo/util/JSPUtil.java | 2 +-
.../tajo/worker/TajoResourceAllocator.java | 37 ++-
.../java/org/apache/tajo/worker/TajoWorker.java | 20 +-
.../tajo/worker/WorkerHeartbeatService.java | 27 +-
.../ConnectivityCheckerRuleForTajoWorker.java | 14 +-
.../main/proto/QueryCoordinatorProtocol.proto | 147 +++++++++
.../main/proto/ResourceTrackerProtocol.proto | 2 +-
.../src/main/proto/TajoMasterProtocol.proto | 147 ---------
.../src/main/resources/webapps/admin/index.jsp | 2 +-
.../src/main/resources/webapps/admin/query.jsp | 2 +-
.../org/apache/tajo/TajoTestingCluster.java | 2 +-
.../tajo/master/rm/TestTajoResourceManager.java | 3 +-
31 files changed, 979 insertions(+), 1047 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e38f78..89488da 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol.
+ (hyunsik)
+
TAJO-1286: Remove netty dependency from tajo-jdbc. (jihun)
TAJO-1282: Cleanup the relationship of QueryInProgress and
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index a7205dd..05ccf07 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -166,7 +166,7 @@
<argument>src/main/proto/ContainerProtocol.proto</argument>
<argument>src/main/proto/ResourceTrackerProtocol.proto</argument>
<argument>src/main/proto/QueryMasterProtocol.proto</argument>
- <argument>src/main/proto/TajoMasterProtocol.proto</argument>
+ <argument>src/main/proto/QueryCoordinatorProtocol.proto</argument>
<argument>src/main/proto/TajoWorkerProtocol.proto</argument>
<argument>src/main/proto/InternalTypes.proto</argument>
</arguments>
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
new file mode 100644
index 0000000..1cb3842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.rm.Worker;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.AsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
+
+public class QueryCoordinatorService extends AbstractService {
+ private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class);
+
+ private final TajoMaster.MasterContext context;
+ private final TajoConf conf;
+ private final ProtocolServiceHandler masterHandler;
+ private AsyncRpcServer server;
+ private InetSocketAddress bindAddress;
+
+ private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+ private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+ public QueryCoordinatorService(TajoMaster.MasterContext context) {
+ super(QueryCoordinatorService.class.getName());
+ this.context = context;
+ this.conf = context.getConf();
+ this.masterHandler = new ProtocolServiceHandler();
+ }
+
+ @Override
+ public void start() {
+ String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+ try {
+ server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum);
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ server.start();
+ bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
+ this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
+ NetUtils.normalizeInetSocketAddress(bindAddress));
+ LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(server != null) {
+ server.shutdown();
+ server = null;
+ }
+ super.stop();
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return bindAddress;
+ }
+
+ /**
+ * Actual protocol service handler
+ */
+ private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface {
+
+ @Override
+ public void heartbeat(
+ RpcController controller,
+ TajoHeartbeat request, RpcCallback<QueryCoordinatorProtocol.TajoHeartbeatResponse> done) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
+ }
+
+ QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+
+ QueryManager queryManager = context.getQueryJobManager();
+ command = queryManager.queryHeartbeat(request);
+
+ QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder();
+ builder.setHeartbeatResult(BOOL_TRUE);
+ if(command != null) {
+ builder.setResponseCommand(command);
+ }
+
+ builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
+ done.run(builder.build());
+ }
+
+ @Override
+ public void allocateWorkerResources(
+ RpcController controller,
+ QueryCoordinatorProtocol.WorkerResourceAllocationRequest request,
+ RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> done) {
+ context.getResourceManager().allocateWorkerResources(request, done);
+ }
+
+ @Override
+ public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
+
+ for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
+ context.getResourceManager().releaseWorkerResource(eachContainer);
+ }
+ done.run(BOOL_TRUE);
+ }
+
+ @Override
+ public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
+ RpcCallback<WorkerResourcesRequest> done) {
+
+ WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder();
+ Collection<Worker> workers = context.getResourceManager().getWorkers().values();
+
+ for(Worker worker: workers) {
+ WorkerResource resource = worker.getResource();
+
+ WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder();
+
+ workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
+ workerResource.setMemoryMB(resource.getMemoryMB());
+ workerResource.setDiskSlots(resource.getDiskSlots());
+
+ builder.addWorkerResources(workerResource);
+ }
+ done.run(builder.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
new file mode 100644
index 0000000..73d8cb2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress {
+ private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+ private QueryId queryId;
+
+ private Session session;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private QueryInfo queryInfo;
+
+ private final TajoMaster.MasterContext masterContext;
+
+ private NettyClientBase queryMasterRpc;
+
+ private QueryMasterProtocolService queryMasterRpcClient;
+
+ public QueryInProgress(
+ TajoMaster.MasterContext masterContext,
+ Session session,
+ QueryContext queryContext,
+ QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+
+ this.masterContext = masterContext;
+ this.session = session;
+ this.queryId = queryId;
+ this.plan = plan;
+
+ queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+ queryInfo.setStartTime(System.currentTimeMillis());
+ }
+
+ public synchronized void kill() {
+ getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
+ if(queryMasterRpcClient != null){
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
+ }
+
+ public void stopProgress() {
+ if(stopped.getAndSet(true)) {
+ return;
+ }
+
+ LOG.info("=========================================================");
+ LOG.info("Stop query:" + queryId);
+
+ masterContext.getResourceManager().releaseQueryMaster(queryId);
+
+ if(queryMasterRpc != null) {
+ RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+ }
+
+ masterContext.getHistoryWriter().appendHistory(queryInfo);
+ }
+
+ public boolean startQueryMaster() {
+ try {
+ LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+ WorkerResourceManager resourceManager = masterContext.getResourceManager();
+ WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+ // if no resource to allocate a query master
+ if(resource == null) {
+ LOG.info("No Available Resources for QueryMaster");
+ return false;
+ }
+
+ queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+ queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+ queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+ return true;
+ } catch (Exception e) {
+ catchException(e);
+ return false;
+ }
+ }
+
+ private void connectQueryMaster() throws Exception {
+ InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+ LOG.info("Connect to QueryMaster:" + addr);
+ queryMasterRpc =
+ RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+ queryMasterRpcClient = queryMasterRpc.getStub();
+ }
+
+ public synchronized void submmitQueryToMaster() {
+ if(querySubmitted.get()) {
+ return;
+ }
+
+ try {
+ if(queryMasterRpcClient == null) {
+ connectQueryMaster();
+ }
+ if(queryMasterRpcClient == null) {
+ LOG.info("No QueryMaster conneciton info.");
+ //TODO wait
+ return;
+ }
+ LOG.info("Call executeQuery to :" +
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+ QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+ builder.setQueryId(queryId.getProto())
+ .setQueryContext(queryInfo.getQueryContext().getProto())
+ .setSession(session.getProto())
+ .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+ .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+ queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+ querySubmitted.set(true);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void catchException(Exception e) {
+ LOG.error(e.getMessage(), e);
+ queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+ queryInfo.setLastMessage(StringUtils.stringifyException(e));
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ public boolean isStarted() {
+ return !stopped.get() && this.querySubmitted.get();
+ }
+
+ public void heartbeat(QueryInfo queryInfo) {
+ LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+ // to avoid partial update by different heartbeats
+ synchronized (this.queryInfo) {
+
+ // terminal state will let client to retrieve a query result
+ // So, we must set the query result before changing query state
+ if (isFinishState(queryInfo.getQueryState())) {
+ if (queryInfo.hasResultdesc()) {
+ this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+ }
+ }
+
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+ // Update diagnosis message
+ if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+
+ // if any error occurs, print outs the error message
+ if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+
+ if (isFinishState(this.queryInfo.getQueryState())) {
+ masterContext.getQueryJobManager().getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+ }
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
deleted file mode 100644
index 6a8da27..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.querymaster.QueryJobEvent;
-import org.apache.tajo.session.Session;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * QueryJobManager manages all scheduled and running queries.
- * It receives all Query related events and routes them to each QueryInProgress.
- */
-public class QueryJobManager extends CompositeService {
- private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName());
-
- // TajoMaster Context
- private final TajoMaster.MasterContext masterContext;
-
- private AsyncDispatcher dispatcher;
-
- private SimpleFifoScheduler scheduler;
-
- private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
-
- private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
-
- private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
- private AtomicLong maxExecutionTime = new AtomicLong();
- private AtomicLong avgExecutionTime = new AtomicLong();
- private AtomicLong executedQuerySize = new AtomicLong();
-
- public QueryJobManager(final TajoMaster.MasterContext masterContext) {
- super(QueryJobManager.class.getName());
- this.masterContext = masterContext;
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- try {
- this.dispatcher = new AsyncDispatcher();
- addService(this.dispatcher);
-
- this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
-
- this.scheduler = new SimpleFifoScheduler(this);
- } catch (Exception e) {
- catchException(null, e);
- }
-
- super.serviceInit(conf);
- }
-
- @Override
- public void serviceStop() throws Exception {
- synchronized(runningQueries) {
- for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
- eachQueryInProgress.stopProgress();
- }
- }
- this.scheduler.stop();
- super.serviceStop();
- }
-
- @Override
- public void serviceStart() throws Exception {
- this.scheduler.start();
- super.serviceStart();
- }
-
- public EventHandler getEventHandler() {
- return dispatcher.getEventHandler();
- }
-
- public Collection<QueryInProgress> getSubmittedQueries() {
- synchronized (submittedQueries){
- return Collections.unmodifiableCollection(submittedQueries.values());
- }
- }
-
- public Collection<QueryInProgress> getRunningQueries() {
- synchronized (runningQueries){
- return Collections.unmodifiableCollection(runningQueries.values());
- }
- }
-
- public synchronized Collection<QueryInfo> getFinishedQueries() {
- try {
- return this.masterContext.getHistoryReader().getQueries(null);
- } catch (Throwable e) {
- LOG.error(e);
- return Lists.newArrayList();
- }
- }
-
-
- public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
- try {
- return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
- } catch (Throwable e) {
- LOG.error(e);
- return null;
- }
- }
-
- public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql,
- String jsonExpr, LogicalRootNode plan)
- throws Exception {
- QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
- jsonExpr, plan);
-
- synchronized (submittedQueries) {
- queryInProgress.getQueryInfo().setQueryMaster("");
- submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- scheduler.addQuery(queryInProgress);
- return queryInProgress.getQueryInfo();
- }
-
- public QueryInfo startQueryJob(QueryId queryId) throws Exception {
-
- QueryInProgress queryInProgress;
-
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.remove(queryId);
- }
-
- synchronized (runningQueries) {
- runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
- }
-
- if (queryInProgress.startQueryMaster()) {
- dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
- queryInProgress.getQueryInfo()));
- } else {
- stopQuery(queryId);
- }
-
- return queryInProgress.getQueryInfo();
- }
-
- class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
-
- @Override
- public void handle(QueryJobEvent event) {
- QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
-
-
- if (queryInProgress == null) {
- LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
- return;
- }
-
-
- if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
- queryInProgress.submmitQueryToMaster();
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
- stopQuery(event.getQueryInfo().getQueryId());
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
- scheduler.removeQuery(queryInProgress.getQueryId());
- queryInProgress.kill();
- stopQuery(queryInProgress.getQueryId());
-
- } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
- queryInProgress.heartbeat(event.getQueryInfo());
- }
- }
- }
-
- public QueryInProgress getQueryInProgress(QueryId queryId) {
- QueryInProgress queryInProgress;
- synchronized (submittedQueries) {
- queryInProgress = submittedQueries.get(queryId);
- }
-
- if (queryInProgress == null) {
- synchronized (runningQueries) {
- queryInProgress = runningQueries.get(queryId);
- }
- }
- return queryInProgress;
- }
-
- public void stopQuery(QueryId queryId) {
- LOG.info("Stop QueryInProgress:" + queryId);
- QueryInProgress queryInProgress = getQueryInProgress(queryId);
- if(queryInProgress != null) {
- queryInProgress.stopProgress();
- synchronized(submittedQueries) {
- submittedQueries.remove(queryId);
- }
-
- synchronized(runningQueries) {
- runningQueries.remove(queryId);
- }
-
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
- long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
- if (executionTime < minExecutionTime.get()) {
- minExecutionTime.set(executionTime);
- }
-
- if (executionTime > maxExecutionTime.get()) {
- maxExecutionTime.set(executionTime);
- }
-
- long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
- if (totalExecutionTime > 0) {
- avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
- } else {
- avgExecutionTime.set(executionTime);
- }
- executedQuerySize.incrementAndGet();
- } else {
- LOG.warn("No QueryInProgress while query stopping: " + queryId);
- }
- }
-
- public long getMinExecutionTime() {
- if (getExecutedQuerySize() == 0) return 0;
- return minExecutionTime.get();
- }
-
- public long getMaxExecutionTime() {
- return maxExecutionTime.get();
- }
-
- public long getAvgExecutionTime() {
- return avgExecutionTime.get();
- }
-
- public long getExecutedQuerySize() {
- return executedQuerySize.get();
- }
-
- private void catchException(QueryId queryId, Exception e) {
- LOG.error(e.getMessage(), e);
- QueryInProgress queryInProgress = runningQueries.get(queryId);
- queryInProgress.catchException(e);
- }
-
- public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
- TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
- if(queryInProgress == null) {
- return null;
- }
-
- QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
- getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
-
- return null;
- }
-
- private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
- QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
- WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
-
- queryInfo.setQueryMaster(connectionInfo.getHost());
- queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
- queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
- queryInfo.setQueryState(queryHeartbeat.getState());
- queryInfo.setProgress(queryHeartbeat.getQueryProgress());
-
- if (queryHeartbeat.hasQueryFinishTime()) {
- queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
- }
-
- if (queryHeartbeat.hasResultDesc()) {
- queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
- }
-
- return queryInfo;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
new file mode 100644
index 0000000..296be04
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.scheduler.SimpleFifoScheduler;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.querymaster.QueryJobEvent;
+import org.apache.tajo.session.Session;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * QueryManager manages all scheduled and running queries.
+ * It receives all Query related events and routes them to each QueryInProgress.
+ */
+public class QueryManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryManager.class.getName());
+
+ // TajoMaster Context
+ private final TajoMaster.MasterContext masterContext;
+
+ private AsyncDispatcher dispatcher;
+
+ private SimpleFifoScheduler scheduler;
+
+ private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap();
+
+ private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap();
+
+ private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE);
+ private AtomicLong maxExecutionTime = new AtomicLong();
+ private AtomicLong avgExecutionTime = new AtomicLong();
+ private AtomicLong executedQuerySize = new AtomicLong();
+
+ public QueryManager(final TajoMaster.MasterContext masterContext) {
+ super(QueryManager.class.getName());
+ this.masterContext = masterContext;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ try {
+ this.dispatcher = new AsyncDispatcher();
+ addService(this.dispatcher);
+
+ this.dispatcher.register(QueryJobEvent.Type.class, new QueryJobManagerEventHandler());
+
+ this.scheduler = new SimpleFifoScheduler(this);
+ } catch (Exception e) {
+ catchException(null, e);
+ }
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ synchronized(runningQueries) {
+ for(QueryInProgress eachQueryInProgress: runningQueries.values()) {
+ eachQueryInProgress.stopProgress();
+ }
+ }
+ this.scheduler.stop();
+ super.serviceStop();
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ this.scheduler.start();
+ super.serviceStart();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public Collection<QueryInProgress> getSubmittedQueries() {
+ synchronized (submittedQueries){
+ return Collections.unmodifiableCollection(submittedQueries.values());
+ }
+ }
+
+ public Collection<QueryInProgress> getRunningQueries() {
+ synchronized (runningQueries){
+ return Collections.unmodifiableCollection(runningQueries.values());
+ }
+ }
+
+ public synchronized Collection<QueryInfo> getFinishedQueries() {
+ try {
+ return this.masterContext.getHistoryReader().getQueries(null);
+ } catch (Throwable e) {
+ LOG.error(e);
+ return Lists.newArrayList();
+ }
+ }
+
+
+ public synchronized QueryInfo getFinishedQuery(QueryId queryId) {
+ try {
+ return this.masterContext.getHistoryReader().getQueryInfo(queryId.toString());
+ } catch (Throwable e) {
+ LOG.error(e);
+ return null;
+ }
+ }
+
+ public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql,
+ String jsonExpr, LogicalRootNode plan)
+ throws Exception {
+ QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql,
+ jsonExpr, plan);
+
+ synchronized (submittedQueries) {
+ queryInProgress.getQueryInfo().setQueryMaster("");
+ submittedQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ }
+
+ scheduler.addQuery(queryInProgress);
+ return queryInProgress.getQueryInfo();
+ }
+
+ public QueryInfo startQueryJob(QueryId queryId) throws Exception {
+
+ QueryInProgress queryInProgress;
+
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.remove(queryId);
+ }
+
+ synchronized (runningQueries) {
+ runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ }
+
+ if (queryInProgress.startQueryMaster()) {
+ dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
+ queryInProgress.getQueryInfo()));
+ } else {
+ stopQuery(queryId);
+ }
+
+ return queryInProgress.getQueryInfo();
+ }
+
+ class QueryJobManagerEventHandler implements EventHandler<QueryJobEvent> {
+
+ @Override
+ public void handle(QueryJobEvent event) {
+ QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId());
+
+
+ if (queryInProgress == null) {
+ LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]");
+ return;
+ }
+
+
+ if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ queryInProgress.submmitQueryToMaster();
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) {
+ stopQuery(event.getQueryInfo().getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ scheduler.removeQuery(queryInProgress.getQueryId());
+ queryInProgress.kill();
+ stopQuery(queryInProgress.getQueryId());
+
+ } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ queryInProgress.heartbeat(event.getQueryInfo());
+ }
+ }
+ }
+
+ public QueryInProgress getQueryInProgress(QueryId queryId) {
+ QueryInProgress queryInProgress;
+ synchronized (submittedQueries) {
+ queryInProgress = submittedQueries.get(queryId);
+ }
+
+ if (queryInProgress == null) {
+ synchronized (runningQueries) {
+ queryInProgress = runningQueries.get(queryId);
+ }
+ }
+ return queryInProgress;
+ }
+
+ public void stopQuery(QueryId queryId) {
+ LOG.info("Stop QueryInProgress:" + queryId);
+ QueryInProgress queryInProgress = getQueryInProgress(queryId);
+ if(queryInProgress != null) {
+ queryInProgress.stopProgress();
+ synchronized(submittedQueries) {
+ submittedQueries.remove(queryId);
+ }
+
+ synchronized(runningQueries) {
+ runningQueries.remove(queryId);
+ }
+
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime();
+ if (executionTime < minExecutionTime.get()) {
+ minExecutionTime.set(executionTime);
+ }
+
+ if (executionTime > maxExecutionTime.get()) {
+ maxExecutionTime.set(executionTime);
+ }
+
+ long totalExecutionTime = executedQuerySize.get() * avgExecutionTime.get();
+ if (totalExecutionTime > 0) {
+ avgExecutionTime.set((totalExecutionTime + executionTime) / (executedQuerySize.get() + 1));
+ } else {
+ avgExecutionTime.set(executionTime);
+ }
+ executedQuerySize.incrementAndGet();
+ } else {
+ LOG.warn("No QueryInProgress while query stopping: " + queryId);
+ }
+ }
+
+ public long getMinExecutionTime() {
+ if (getExecutedQuerySize() == 0) return 0;
+ return minExecutionTime.get();
+ }
+
+ public long getMaxExecutionTime() {
+ return maxExecutionTime.get();
+ }
+
+ public long getAvgExecutionTime() {
+ return avgExecutionTime.get();
+ }
+
+ public long getExecutedQuerySize() {
+ return executedQuerySize.get();
+ }
+
+ private void catchException(QueryId queryId, Exception e) {
+ LOG.error(e.getMessage(), e);
+ QueryInProgress queryInProgress = runningQueries.get(queryId);
+ queryInProgress.catchException(e);
+ }
+
+ public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat(
+ QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId()));
+ if(queryInProgress == null) {
+ return null;
+ }
+
+ QueryInfo queryInfo = makeQueryInfoFromHeartbeat(queryHeartbeat);
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_HEARTBEAT, queryInfo));
+
+ return null;
+ }
+
+ private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) {
+ QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
+ WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+ queryInfo.setQueryMaster(connectionInfo.getHost());
+ queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
+ queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+ queryInfo.setQueryState(queryHeartbeat.getState());
+ queryInfo.setProgress(queryHeartbeat.getQueryProgress());
+
+ if (queryHeartbeat.hasQueryFinishTime()) {
+ queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
+ }
+
+ if (queryHeartbeat.hasResultDesc()) {
+ queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+ }
+
+ return queryInfo;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7209080..2ffd7ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -26,7 +26,7 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.master.container.TajoContainer;
@@ -177,23 +177,23 @@ public class TajoContainerProxy extends ContainerProxy {
if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(conf));
context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(conf));
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.releaseWorkerResource(null,
- TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+ QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
.addAllContainerIds(containerIdProtos)
.build(),
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c054599..786025a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService {
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
- private TajoMasterService tajoMasterService;
+ private QueryCoordinatorService tajoMasterService;
private SessionManager sessionManager;
private WorkerResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
- private QueryJobManager queryJobManager;
+ private QueryManager queryManager;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -183,13 +183,13 @@ public class TajoMaster extends CompositeService {
globalEngine = new GlobalEngine(context);
addIfService(globalEngine);
- queryJobManager = new QueryJobManager(context);
- addIfService(queryJobManager);
+ queryManager = new QueryManager(context);
+ addIfService(queryManager);
tajoMasterClientService = new TajoMasterClientService(context);
addIfService(tajoMasterClientService);
- tajoMasterService = new TajoMasterService(context);
+ tajoMasterService = new QueryCoordinatorService(context);
addIfService(tajoMasterService);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -441,8 +441,8 @@ public class TajoMaster extends CompositeService {
return clock;
}
- public QueryJobManager getQueryJobManager() {
- return queryJobManager;
+ public QueryManager getQueryJobManager() {
+ return queryManager;
}
public WorkerResourceManager getResourceManager() {
@@ -469,7 +469,7 @@ public class TajoMaster extends CompositeService {
return storeManager;
}
- public TajoMasterService getTajoMasterService() {
+ public QueryCoordinatorService getTajoMasterService() {
return tajoMasterService;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 93326be..fcc016a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
-import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.querymaster.QueryJobEvent;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
@@ -567,8 +566,8 @@ public class TajoMasterClientService extends AbstractService {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
- QueryJobManager queryJobManager = context.getQueryJobManager();
- QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId);
+ QueryManager queryManager = context.getQueryJobManager();
+ QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId);
QueryInfo queryInfo = null;
if (queryInProgress == null) {
@@ -598,8 +597,8 @@ public class TajoMasterClientService extends AbstractService {
try {
context.getSessionManager().touch(request.getSessionId().getId());
QueryId queryId = new QueryId(request.getQueryId());
- QueryJobManager queryJobManager = context.getQueryJobManager();
- queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+ QueryManager queryManager = context.getQueryJobManager();
+ queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
new QueryInfo(queryId)));
return BOOL_TRUE;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
deleted file mode 100644
index 02bdfa1..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.rpc.AsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.List;
-
-public class TajoMasterService extends AbstractService {
- private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
-
- private final TajoMaster.MasterContext context;
- private final TajoConf conf;
- private final TajoMasterServiceHandler masterHandler;
- private AsyncRpcServer server;
- private InetSocketAddress bindAddress;
-
- private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
- private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
-
- public TajoMasterService(TajoMaster.MasterContext context) {
- super(TajoMasterService.class.getName());
- this.context = context;
- this.conf = context.getConf();
- this.masterHandler = new TajoMasterServiceHandler();
- }
-
- @Override
- public void start() {
- String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
- InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
- int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
- try {
- server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
- } catch (Exception e) {
- LOG.error(e);
- }
- server.start();
- bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
- this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
- NetUtils.normalizeInetSocketAddress(bindAddress));
- LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
- super.start();
- }
-
- @Override
- public void stop() {
- if(server != null) {
- server.shutdown();
- server = null;
- }
- super.stop();
- }
-
- public InetSocketAddress getBindAddress() {
- return bindAddress;
- }
-
- public class TajoMasterServiceHandler
- implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
- @Override
- public void heartbeat(
- RpcController controller,
- TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
- }
-
- TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
-
- QueryJobManager queryJobManager = context.getQueryJobManager();
- command = queryJobManager.queryHeartbeat(request);
-
- TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
- builder.setHeartbeatResult(BOOL_TRUE);
- if(command != null) {
- builder.setResponseCommand(command);
- }
-
- builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
- done.run(builder.build());
- }
-
- @Override
- public void allocateWorkerResources(
- RpcController controller,
- TajoMasterProtocol.WorkerResourceAllocationRequest request,
- RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
- context.getResourceManager().allocateWorkerResources(request, done);
- }
-
- @Override
- public void releaseWorkerResource(RpcController controller,
- TajoMasterProtocol.WorkerResourceReleaseRequest request,
- RpcCallback<PrimitiveProtos.BoolProto> done) {
- List<ContainerProtocol.TajoContainerIdProto> containerIds = request.getContainerIdsList();
-
- for(ContainerProtocol.TajoContainerIdProto eachContainer: containerIds) {
- context.getResourceManager().releaseWorkerResource(eachContainer);
- }
- done.run(BOOL_TRUE);
- }
-
- @Override
- public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request,
- RpcCallback<TajoMasterProtocol.WorkerResourcesRequest> done) {
-
- TajoMasterProtocol.WorkerResourcesRequest.Builder builder =
- TajoMasterProtocol.WorkerResourcesRequest.newBuilder();
- Collection<Worker> workers = context.getResourceManager().getWorkers().values();
-
- for(Worker worker: workers) {
- WorkerResource resource = worker.getResource();
-
- TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
- TajoMasterProtocol.WorkerResourceProto.newBuilder();
-
- workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
- workerResource.setMemoryMB(resource.getMemoryMB());
- workerResource.setDiskSlots(resource.getDiskSlots());
-
- builder.addWorkerResources(workerResource);
- }
- done.run(builder.build());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 2fbebc1..0860d63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -388,10 +388,10 @@ public class QueryExecutor {
context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
- QueryJobManager queryJobManager = this.context.getQueryJobManager();
+ QueryManager queryManager = this.context.getQueryJobManager();
QueryInfo queryInfo;
- queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode);
+ queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode);
if(queryInfo == null) {
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 831ce43..519aa9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -26,7 +26,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
@@ -36,8 +37,6 @@ import org.apache.tajo.util.ProtoUtil;
import java.io.IOError;
import java.net.InetSocketAddress;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService;
@@ -110,7 +109,8 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
/** The response builder */
- private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
+ private static final TajoHeartbeatResponse.Builder builder =
+ TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
return new WorkerStatusEvent(
@@ -204,7 +204,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
}
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ public ClusterResourceSummary getClusterResourceSummary() {
int totalDiskSlots = 0;
int totalCpuCoreSlots = 0;
int totalMemoryMB = 0;
@@ -230,7 +230,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
}
- return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+ return ClusterResourceSummary.newBuilder()
.setNumWorkers(rmContext.getWorkers().size())
.setTotalCpuCoreSlots(totalCpuCoreSlots)
.setTotalDiskSlots(totalDiskSlots)
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 9f2a3d5..e5cf66c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
+import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.querymaster.QueryInProgress;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.ApplicationIdUtils;
@@ -49,8 +49,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.tajo.ipc.TajoMasterProtocol.*;
-
/**
* It manages all resources of tajo workers.
@@ -162,7 +160,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+ public ClusterResourceSummary getClusterResourceSummary() {
return resourceTracker.getClusterResourceSummary();
}
@@ -204,7 +202,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB);
builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot);
builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot);
- builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY);
+ builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY);
builder.setNumContainers(1);
return builder.build();
}
@@ -358,10 +356,10 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
int allocatedResources = 0;
- TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+ ResourceRequestPriority resourceRequestPriority
= resourceRequest.request.getResourceRequestPriority();
- if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+ if(resourceRequestPriority == ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 79ec0ac..662b699 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -22,16 +22,15 @@ import com.google.protobuf.RpcCallback;
import org.apache.hadoop.service.Service;
import org.apache.tajo.QueryId;
import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest;
+import org.apache.tajo.master.QueryInProgress;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
/**
* An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers
* and release the allocated containers.
@@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service {
* @return A allocated container resource
*/
@Deprecated
- public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
+ public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress);
/**
* Request one or more resource containers. You can set the number of containers and resource capabilities, such as
@@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service {
* @param request Request description
* @param rpcCallBack Callback function
*/
- public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request,
- RpcCallback<WorkerResourceAllocationResponse> rpcCallBack);
+ public void allocateWorkerResources(WorkerResourceAllocationRequest request,
+ RpcCallback<QueryCoordinatorProtocol.WorkerResourceAllocationResponse> rpcCallBack);
/**
* Release a container
@@ -100,7 +99,7 @@ public interface WorkerResourceManager extends Service {
*
* @return The overall summary of cluster resources
*/
- public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
+ public ClusterResourceSummary getClusterResourceSummary();
/**
*
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
index 02203a9..19493d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.scheduler;
import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
+import org.apache.tajo.master.QueryInProgress;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
index a091ed5..6cb98eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master.scheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
-import org.apache.tajo.querymaster.QueryInProgress;
-import org.apache.tajo.master.QueryJobManager;
+import org.apache.tajo.master.QueryInProgress;
+import org.apache.tajo.master.QueryManager;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler {
private LinkedList<QuerySchedulingInfo> pool = new LinkedList<QuerySchedulingInfo>();
private final Thread queryProcessor;
private AtomicBoolean stopped = new AtomicBoolean();
- private QueryJobManager manager;
+ private QueryManager manager;
private Comparator<QuerySchedulingInfo> COMPARATOR = new SchedulingAlgorithms.FifoComparator();
- public SimpleFifoScheduler(QueryJobManager manager) {
+ public SimpleFifoScheduler(QueryManager manager) {
this.manager = manager;
this.queryProcessor = new Thread(new QueryProcessor());
this.queryProcessor.setName("Query Processor");
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
deleted file mode 100644
index f83f244..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.querymaster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
-import org.apache.tajo.master.QueryInfo;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.session.Session;
-import org.apache.tajo.util.NetUtils;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-
-public class QueryInProgress {
- private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
-
- private QueryId queryId;
-
- private Session session;
-
- private LogicalRootNode plan;
-
- private AtomicBoolean querySubmitted = new AtomicBoolean(false);
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private QueryInfo queryInfo;
-
- private final TajoMaster.MasterContext masterContext;
-
- private NettyClientBase queryMasterRpc;
-
- private QueryMasterProtocolService queryMasterRpcClient;
-
- public QueryInProgress(
- TajoMaster.MasterContext masterContext,
- Session session,
- QueryContext queryContext,
- QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
-
- this.masterContext = masterContext;
- this.session = session;
- this.queryId = queryId;
- this.plan = plan;
-
- queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
- queryInfo.setStartTime(System.currentTimeMillis());
- }
-
- public synchronized void kill() {
- getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
- if(queryMasterRpcClient != null){
- queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
- }
- }
-
- public void stopProgress() {
- if(stopped.getAndSet(true)) {
- return;
- }
-
- LOG.info("=========================================================");
- LOG.info("Stop query:" + queryId);
-
- masterContext.getResourceManager().releaseQueryMaster(queryId);
-
- if(queryMasterRpc != null) {
- RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
- }
-
- masterContext.getHistoryWriter().appendHistory(queryInfo);
- }
-
- public boolean startQueryMaster() {
- try {
- LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
- WorkerResourceManager resourceManager = masterContext.getResourceManager();
- WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
-
- // if no resource to allocate a query master
- if(resource == null) {
- LOG.info("No Available Resources for QueryMaster");
- return false;
- }
-
- queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
- queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
- queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
-
- return true;
- } catch (Exception e) {
- catchException(e);
- return false;
- }
- }
-
- private void connectQueryMaster() throws Exception {
- InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
- LOG.info("Connect to QueryMaster:" + addr);
- queryMasterRpc =
- RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
- queryMasterRpcClient = queryMasterRpc.getStub();
- }
-
- public synchronized void submmitQueryToMaster() {
- if(querySubmitted.get()) {
- return;
- }
-
- try {
- if(queryMasterRpcClient == null) {
- connectQueryMaster();
- }
- if(queryMasterRpcClient == null) {
- LOG.info("No QueryMaster conneciton info.");
- //TODO wait
- return;
- }
- LOG.info("Call executeQuery to :" +
- queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
-
- QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
- builder.setQueryId(queryId.getProto())
- .setQueryContext(queryInfo.getQueryContext().getProto())
- .setSession(session.getProto())
- .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
- .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
-
- queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
- querySubmitted.set(true);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- }
- }
-
- public void catchException(Exception e) {
- LOG.error(e.getMessage(), e);
- queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
- queryInfo.setLastMessage(StringUtils.stringifyException(e));
- }
-
- public QueryId getQueryId() {
- return queryId;
- }
-
- public QueryInfo getQueryInfo() {
- return this.queryInfo;
- }
-
- public boolean isStarted() {
- return !stopped.get() && this.querySubmitted.get();
- }
-
- public void heartbeat(QueryInfo queryInfo) {
- LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-
- // to avoid partial update by different heartbeats
- synchronized (this.queryInfo) {
-
- // terminal state will let client to retrieve a query result
- // So, we must set the query result before changing query state
- if (isFinishState(queryInfo.getQueryState())) {
- if (queryInfo.hasResultdesc()) {
- this.queryInfo.setResultDesc(queryInfo.getResultDesc());
- }
- }
-
- this.queryInfo.setQueryState(queryInfo.getQueryState());
- this.queryInfo.setProgress(queryInfo.getProgress());
- this.queryInfo.setFinishTime(queryInfo.getFinishTime());
-
- // Update diagnosis message
- if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
- this.queryInfo.setLastMessage(queryInfo.getLastMessage());
- LOG.info(queryId + queryInfo.getLastMessage());
- }
-
- // if any error occurs, print outs the error message
- if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
- LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
- }
-
-
- if (isFinishState(this.queryInfo.getQueryState())) {
- masterContext.getQueryJobManager().getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
- }
- }
- }
-
- private boolean isFinishState(TajoProtos.QueryState state) {
- return state == TajoProtos.QueryState.QUERY_FAILED ||
- state == TajoProtos.QueryState.QUERY_KILLED ||
- state == TajoProtos.QueryState.QUERY_SUCCEEDED;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 02760a3..53390a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -34,7 +34,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ha.HAServiceUtil;
-import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol;
+import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.event.QueryStartEvent;
import org.apache.tajo.master.event.QueryStopEvent;
@@ -56,10 +57,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat;
-import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse;
-
-// TODO - when exception, send error status to QueryJobManager
public class QueryMaster extends CompositeService implements EventHandler {
private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName());
@@ -182,12 +179,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
LOG.info("cleanup executionBlocks: " + cleanupMessage);
NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+ List<WorkerResourceProto> workers = getAllWorker();
TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder();
builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ for (WorkerResourceProto worker : workers) {
try {
TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -206,9 +203,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
private void cleanup(QueryId queryId) {
LOG.info("cleanup query resources : " + queryId);
NettyClientBase rpc = null;
- List<TajoMasterProtocol.WorkerResourceProto> workers = getAllWorker();
+ List<WorkerResourceProto> workers = getAllWorker();
- for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
+ for (WorkerResourceProto worker : workers) {
try {
TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
@@ -224,7 +221,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
}
- public List<TajoMasterProtocol.WorkerResourceProto> getAllWorker() {
+ public List<WorkerResourceProto> getAllWorker() {
NettyClientBase rpc = null;
try {
@@ -235,78 +232,34 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(systemConf));
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub();
+ QueryCoordinatorProtocolService masterService = rpc.getStub();
- CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourcesRequest>();
+ CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>();
masterService.getAllWorkerResource(callBack.getController(),
PrimitiveProtos.NullProto.getDefaultInstance(), callBack);
- TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
+ WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
return workerResourcesRequest.getWorkerResourcesList();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(rpc);
}
- return new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
- }
-
- public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) {
- LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId);
- NettyClientBase tmClient = null;
- try {
- // In TajoMaster HA mode, if backup master be active status,
- // worker may fail to connect existing active master. Thus,
- // if worker can't connect the master, worker should try to connect another master and
- // update master address in worker context.
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- try {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- } catch (Exception e) {
- queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
- HAServiceUtil.getResourceTrackerAddress(systemConf));
- queryMasterContext.getWorkerContext().setTajoMasterAddress(
- HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
- } else {
- tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
- }
-
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
-
- TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setConnectionInfo(workerContext.getConnectionInfo().getProto())
- .setState(state)
- .setQueryId(queryId.getProto());
-
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
-
- masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
+ return new ArrayList<WorkerResourceProto>();
}
@Override
@@ -407,19 +360,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
} catch (Exception e) {
//this function will be closed in new thread.
@@ -524,24 +477,24 @@ public class QueryMaster extends CompositeService implements EventHandler {
if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
} catch (Exception e) {
queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(
HAServiceUtil.getResourceTrackerAddress(systemConf));
queryMasterContext.getWorkerContext().setTajoMasterAddress(
HAServiceUtil.getMasterUmbilicalAddress(systemConf));
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ QueryCoordinatorProtocol.class, true);
}
- TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
+ QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
- CallFuture<TajoHeartbeatResponse> callBack =
- new CallFuture<TajoHeartbeatResponse>();
+ CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>();
TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask);
masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack);