You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/20 08:54:16 UTC
[2/2] git commit: TAJO-1016: Refactor worker rpc information. (jinho)
TAJO-1016: Refactor worker rpc information. (jinho)
Closes #125
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28282b56
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28282b56
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28282b56
Branch: refs/heads/master
Commit: 28282b561ad0a75f9603936bf04f2aa5c99b6b58
Parents: 469820d
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 15:53:06 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 15:53:06 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/client/TajoAdmin.java | 36 ++--
tajo-client/src/main/proto/ClientProtos.proto | 37 ++--
tajo-common/src/main/proto/tajo_protos.proto | 10 ++
.../tajo/master/DefaultTaskScheduler.java | 18 +-
.../apache/tajo/master/LazyTaskScheduler.java | 4 +-
.../apache/tajo/master/TajoContainerProxy.java | 5 +-
.../tajo/master/TajoMasterClientService.java | 12 +-
.../apache/tajo/master/TajoMasterService.java | 9 +-
.../master/cluster/WorkerConnectionInfo.java | 178 +++++++++++++++++++
.../master/event/TaskAttemptAssignedEvent.java | 17 +-
.../tajo/master/event/TaskRequestEvent.java | 13 +-
.../master/querymaster/QueryInProgress.java | 6 +-
.../master/querymaster/QueryJobManager.java | 11 +-
.../tajo/master/querymaster/QueryMaster.java | 18 +-
.../querymaster/QueryMasterManagerService.java | 2 +-
.../tajo/master/querymaster/QueryUnit.java | 12 +-
.../master/querymaster/QueryUnitAttempt.java | 32 +---
.../apache/tajo/master/rm/TajoRMContext.java | 14 +-
.../tajo/master/rm/TajoResourceTracker.java | 24 +--
.../master/rm/TajoWorkerResourceManager.java | 33 ++--
.../java/org/apache/tajo/master/rm/Worker.java | 73 ++------
.../org/apache/tajo/master/rm/WorkerEvent.java | 6 +-
.../tajo/master/rm/WorkerLivelinessMonitor.java | 4 +-
.../tajo/master/rm/WorkerReconnectEvent.java | 2 +-
.../tajo/master/rm/WorkerResourceManager.java | 6 +-
.../tajo/master/rm/WorkerStatusEvent.java | 2 +-
.../tajo/worker/AbstractResourceAllocator.java | 15 ++
.../tajo/worker/ExecutionBlockContext.java | 12 +-
.../tajo/worker/TajoResourceAllocator.java | 17 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 178 ++++++++++---------
.../tajo/worker/TajoWorkerClientService.java | 6 +-
.../tajo/worker/TajoWorkerManagerService.java | 31 +---
.../main/java/org/apache/tajo/worker/Task.java | 3 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 20 +--
.../apache/tajo/worker/TaskRunnerManager.java | 11 +-
.../tajo/worker/WorkerHeartbeatService.java | 57 +++---
.../tajo/worker/event/TaskRunnerStartEvent.java | 18 +-
.../main/proto/ResourceTrackerProtocol.proto | 12 +-
.../src/main/proto/TajoMasterProtocol.proto | 36 ++--
.../src/main/proto/TajoWorkerProtocol.proto | 20 +--
.../main/resources/webapps/admin/cluster.jsp | 54 +++---
.../src/main/resources/webapps/admin/index.jsp | 4 +-
.../src/main/resources/webapps/admin/query.jsp | 8 +-
.../resources/webapps/worker/querytasks.jsp | 9 +-
.../resources/webapps/worker/taskdetail.jsp | 2 +
.../src/main/resources/webapps/worker/tasks.jsp | 4 +-
.../tajo/cluster/TestWorkerConnectionInfo.java | 36 ++++
.../tajo/master/rm/TestTajoResourceManager.java | 8 +-
.../tajo/pullserver/TajoPullServerService.java | 15 +-
50 files changed, 632 insertions(+), 530 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a5f31f9..5d92881 100644
--- a/CHANGES
+++ b/CHANGES
@@ -444,6 +444,8 @@ Release 0.9.0 - unreleased
SUB TASKS
+ TAJO-1016: Refactor worker rpc information. (jinho)
+
TAJO-1015: Add executionblock event in worker. (jinho)
TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 95dfc68..1acdb4d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -276,14 +276,15 @@ public class TajoAdmin {
line5, line10, line10);
writer.write(line);
for (WorkerResourceInfo queryMaster : liveQueryMasters) {
- String queryMasterHost = String.format("%s:%d",
- queryMaster.getAllocatedHost(),
- queryMaster.getQueryMasterPort());
- String heap = String.format("%d MB", queryMaster.getMaxHeap()/1024/1024);
- line = String.format(fmtQueryMasterLine, queryMasterHost,
- queryMaster.getClientPort(),
- queryMaster.getNumQueryMasterTasks(),
- heap, queryMaster.getWorkerStatus());
+ TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+ String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+ String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024);
+ line = String.format(fmtQueryMasterLine,
+ queryMasterHost,
+ connInfo.getClientPort(),
+ queryMaster.getNumQueryMasterTasks(),
+ heap,
+ queryMaster.getWorkerStatus());
writer.write(line);
}
@@ -301,12 +302,12 @@ public class TajoAdmin {
writer.write(line);
for (WorkerResourceInfo queryMaster : deadQueryMasters) {
- String queryMasterHost = String.format("%s:%d",
- queryMaster.getAllocatedHost(),
- queryMaster.getQueryMasterPort());
- line = String.format(fmtQueryMasterLine, queryMasterHost,
- queryMaster.getClientPort(),
- queryMaster.getWorkerStatus());
+ TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+ String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+ line = String.format(fmtQueryMasterLine,
+ queryMasterHost,
+ connInfo.getClientPort(),
+ queryMaster.getWorkerStatus());
writer.write(line);
}
@@ -358,9 +359,8 @@ public class TajoAdmin {
writer.write(line);
for (WorkerResourceInfo worker : workers) {
- String workerHost = String.format("%s:%d",
- worker.getAllocatedHost(),
- worker.getPeerRpcPort());
+ TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo();
+ String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort());
String mem = String.format("%d/%d", worker.getUsedMemoryMB(),
worker.getMemoryMB());
String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(),
@@ -369,7 +369,7 @@ public class TajoAdmin {
worker.getMaxHeap()/1024/1024);
line = String.format(fmtWorkerLine, workerHost,
- worker.getPullServerPort(),
+ connInfo.getPullServerPort(),
worker.getNumRunningTasks(),
mem, disk, heap, worker.getWorkerStatus());
writer.write(line);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index c66b228..0359685 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -157,27 +157,22 @@ message GetClusterInfoRequest {
}
message WorkerResourceInfo {
- required string allocatedHost = 1;
- required int32 peerRpcPort = 2;
- required int32 queryMasterPort = 3;
- required int32 clientPort = 4;
- required int32 pullServerPort = 5;
- required int32 httpPort = 6;
- required float diskSlots = 7;
- required int32 cpuCoreSlots = 8;
- required int32 memoryMB = 9;
- required float usedDiskSlots = 10;
- required int32 usedMemoryMB = 11;
- required int32 usedCpuCoreSlots = 12;
- required int64 maxHeap = 13;
- required int64 freeHeap = 14;
- required int64 totalHeap = 15;
- required int32 numRunningTasks = 16;
- required string workerStatus = 17;
- required int64 lastHeartbeat = 18;
- required bool queryMasterMode = 19;
- required bool taskRunnerMode = 20;
- required int32 numQueryMasterTasks = 21;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required float diskSlots = 2;
+ required int32 cpuCoreSlots = 3;
+ required int32 memoryMB = 4;
+ required float usedDiskSlots = 5;
+ required int32 usedMemoryMB = 6;
+ required int32 usedCpuCoreSlots = 7;
+ required int64 maxHeap = 8;
+ required int64 freeHeap = 9;
+ required int64 totalHeap = 10;
+ required int32 numRunningTasks = 11;
+ required string workerStatus = 12;
+ required int64 lastHeartbeat = 13;
+ required bool queryMasterMode = 14;
+ required bool taskRunnerMode = 15;
+ required int32 numQueryMasterTasks = 16;
}
message GetClusterInfoResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index edd27fc..b6cd9ef 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -52,4 +52,14 @@ enum FetcherState {
FETCH_FETCHING = 1;
FETCH_FINISHED = 2;
FETCH_FAILED = 3;
+}
+
+message WorkerConnectionInfoProto {
+ required int32 id = 1;
+ required string host = 2;
+ required int32 peerRpcPort = 3;
+ required int32 pullServerPort = 4;
+ optional int32 queryMasterPort = 5;
+ required int32 clientPort = 6;
+ required int32 httpInfoPort = 7;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 7684df2..2cb8878 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -33,6 +33,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -744,13 +745,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
// getting the hostname of requested node
- String host = container.getTaskHostName();
+ WorkerConnectionInfo connectionInfo =
+ context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
+ String host = connectionInfo.getHost();
// if there are no worker matched to the hostname a task request
if(!leafTaskHostMapping.containsKey(host)){
- host = NetUtils.normalizeHost(host);
+ String normalizedHost = NetUtils.normalizeHost(host);
- if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
+ if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
// this case means one of either cases:
// * there are no blocks which reside in this node.
// * all blocks which reside in this node are consumed, and this task runner requests a remote task.
@@ -826,8 +829,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(),
- host, container.getTaskPort()));
+ taskRequest.getContainerId(), connectionInfo));
assignedRequest.add(attemptId);
scheduledObjectNum--;
@@ -891,10 +893,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
}
- ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
- taskRequest.getContainerId());
+ WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
+ getWorkerConnectionInfo(taskRequest.getWorkerId());
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+ taskRequest.getContainerId(), connectionInfo));
taskRequest.getCallback().run(taskAssign.getProto());
totalAssigned++;
scheduledObjectNum--;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 6552998..f7953e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -469,8 +469,6 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
QueryUnitAttemptId attemptId = taskAttempt.getId();
- ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
- getContainer(attemptContext.getContainerId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
@@ -495,7 +493,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
}
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+ attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
totalAssigned++;
attemptContext.getCallback().run(taskAssign.getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index c317ba5..c236c20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -59,7 +59,7 @@ public class TajoContainerProxy extends ContainerProxy {
context.getResourceAllocator().addContainer(containerID, this);
this.hostName = container.getNodeId().getHost();
- this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
+ this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort();
this.state = ContainerState.RUNNING;
if (LOG.isDebugEnabled()) {
@@ -102,8 +102,7 @@ public class TajoContainerProxy extends ContainerProxy {
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
- .setQueryMasterHost(myAddr.getHostName())
- .setQueryMasterPort(myAddr.getPort())
+ .setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto())
.setNodeId(container.getNodeId().toString())
.setContainerId(container.getId().toString())
.setQueryOutputPath(context.getStagingDir().toString())
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7d80a88..e69393a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -503,9 +503,9 @@ public class TajoMasterClientService extends AbstractService {
context.getSessionManager().touch(request.getSessionId().getId());
GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
- Map<String, Worker> workers = context.getResourceManager().getWorkers();
+ Map<Integer, Worker> workers = context.getResourceManager().getWorkers();
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
Collections.sort(wokerKeys);
WorkerResourceInfo.Builder workerBuilder
@@ -513,7 +513,8 @@ public class TajoMasterClientService extends AbstractService {
for(Worker worker: workers.values()) {
WorkerResource workerResource = worker.getResource();
- workerBuilder.setAllocatedHost(worker.getHostName());
+
+ workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto());
workerBuilder.setDiskSlots(workerResource.getDiskSlots());
workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
workerBuilder.setMemoryMB(workerResource.getMemoryMB());
@@ -524,11 +525,6 @@ public class TajoMasterClientService extends AbstractService {
workerBuilder.setWorkerStatus(worker.getState().toString());
workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
- workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
- workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
- workerBuilder.setClientPort(worker.getClientPort());
- workerBuilder.setPullServerPort(worker.getPullServerPort());
- workerBuilder.setHttpPort(worker.getHttpPort());
workerBuilder.setMaxHeap(workerResource.getMaxHeap());
workerBuilder.setFreeHeap(workerResource.getFreeHeap());
workerBuilder.setTotalHeap(workerResource.getTotalHeap());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 5e9f729..ddf24d3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
@@ -97,7 +98,7 @@ public class TajoMasterService extends AbstractService {
RpcController controller,
TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort());
+ LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
}
TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
@@ -156,13 +157,9 @@ public class TajoMasterService extends AbstractService {
TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
TajoMasterProtocol.WorkerResourceProto.newBuilder();
- workerResource.setHost(worker.getHostName());
- workerResource.setPeerRpcPort(worker.getPeerRpcPort());
- workerResource.setInfoPort(worker.getHttpPort());
- workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+ workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
workerResource.setMemoryMB(resource.getMemoryMB());
workerResource.setDiskSlots(resource.getDiskSlots());
- workerResource.setQueryMasterPort(worker.getQueryMasterPort());
builder.addWorkerResources(workerResource);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
new file mode 100644
index 0000000..78d4978
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.cluster;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.TajoProtos.WorkerConnectionInfoProto;
+
+public class WorkerConnectionInfo implements ProtoObject<WorkerConnectionInfoProto>, Comparable<WorkerConnectionInfo> {
+
+ /**
+ * unique worker id
+ */
+ private int id;
+ /**
+ * Hostname
+ */
+ private String host;
+ /**
+ * Peer rpc port
+ */
+ private int peerRpcPort;
+ /**
+ * pull server port
+ */
+ private int pullServerPort;
+ /**
+ * QueryMaster rpc port
+ */
+ private int queryMasterPort;
+ /**
+ * the port of client rpc which provides an client API
+ */
+ private int clientPort;
+ /**
+ * http info port
+ */
+ private int httpInfoPort;
+
+ public WorkerConnectionInfo() {
+ }
+
+ public WorkerConnectionInfo(WorkerConnectionInfoProto proto) {
+ this();
+ this.id = proto.getId();
+ this.host = proto.getHost();
+ this.peerRpcPort = proto.getPeerRpcPort();
+ this.pullServerPort = proto.getPullServerPort();
+ this.clientPort = proto.getClientPort();
+ this.httpInfoPort = proto.getHttpInfoPort();
+ this.queryMasterPort = proto.getQueryMasterPort();
+ }
+
+ public WorkerConnectionInfo(String host, int peerRpcPort, int pullServerPort, int clientPort,
+ int queryMasterPort, int httpInfoPort) {
+ this();
+ this.host = host;
+ this.peerRpcPort = peerRpcPort;
+ this.pullServerPort = pullServerPort;
+ this.clientPort = clientPort;
+ this.queryMasterPort = queryMasterPort;
+ this.httpInfoPort = httpInfoPort;
+ this.id = hashCode();
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPeerRpcPort() {
+ return peerRpcPort;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+
+ public int getQueryMasterPort() {
+ return queryMasterPort;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public int getHttpInfoPort() {
+ return httpInfoPort;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getHostAndPeerRpcPort() {
+ return this.getHost() + ":" + this.getPeerRpcPort();
+ }
+
+ @Override
+ public WorkerConnectionInfoProto getProto() {
+ WorkerConnectionInfoProto.Builder builder = WorkerConnectionInfoProto.newBuilder();
+ builder.setId(id)
+ .setHost(host)
+ .setPeerRpcPort(peerRpcPort)
+ .setPullServerPort(pullServerPort)
+ .setClientPort(clientPort)
+ .setHttpInfoPort(httpInfoPort)
+ .setQueryMasterPort(queryMasterPort);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 493217;
+ int result = 8501;
+ result = prime * result + this.getHost().hashCode();
+ result = prime * result + this.getPeerRpcPort();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ WorkerConnectionInfo other = (WorkerConnectionInfo) obj;
+ if (!this.getHost().equals(other.getHost()))
+ return false;
+ if (this.getPeerRpcPort() != other.getPeerRpcPort())
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(WorkerConnectionInfo other) {
+ int hostCompare = this.getHost().compareTo(other.getHost());
+ if (hostCompare == 0) {
+ if (this.getPeerRpcPort() > other.getPeerRpcPort()) {
+ return 1;
+ } else if (this.getPeerRpcPort() < other.getPeerRpcPort()) {
+ return -1;
+ }
+ return 0;
+ }
+ return hostCompare;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id:").append(id).append(", ")
+ .append("host:").append(host).append(", ")
+ .append("PeerRpcPort:").append(peerRpcPort).append(", ")
+ .append("PullServerPort:").append(pullServerPort).append(", ")
+ .append("ClientPort:").append(clientPort).append(", ")
+ .append("QueryMasterPort:").append(queryMasterPort).append(", ")
+ .append("HttpInfoPort:").append(httpInfoPort);
+ return builder.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 4934633..e0928c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -20,29 +20,24 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
private final ContainerId cId;
- private final String hostName;
- private final int pullServerPort;
+ private final WorkerConnectionInfo workerConnectionInfo;
public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
- String hostname, int pullServerPort) {
+ WorkerConnectionInfo connectionInfo) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.cId = cId;
- this.hostName = hostname;
- this.pullServerPort = pullServerPort;
+ this.workerConnectionInfo = connectionInfo;
}
public ContainerId getContainerId() {
return cId;
}
- public String getHostName() {
- return hostName;
- }
-
- public int getPullServerPort() {
- return pullServerPort;
+ public WorkerConnectionInfo getWorkerConnectionInfo(){
+ return workerConnectionInfo;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 9be7cab..2197c33 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -31,24 +31,31 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
TASK_REQ
}
- private final ContainerId workerId;
+ private final int workerId;
+ private final ContainerId containerId;
private final ExecutionBlockId executionBlockId;
private final RpcCallback<QueryUnitRequestProto> callback;
- public TaskRequestEvent(ContainerId workerId,
+ public TaskRequestEvent(int workerId,
+ ContainerId containerId,
ExecutionBlockId executionBlockId,
RpcCallback<QueryUnitRequestProto> callback) {
super(TaskRequestEventType.TASK_REQ);
this.workerId = workerId;
+ this.containerId = containerId;
this.executionBlockId = executionBlockId;
this.callback = callback;
}
- public ContainerId getContainerId() {
+ public int getWorkerId() {
return this.workerId;
}
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 261200e..877a20a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -172,9 +172,9 @@ public class QueryInProgress extends CompositeService {
return false;
}
- queryInfo.setQueryMaster(resource.getWorkerHost());
- queryInfo.setQueryMasterPort(resource.getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(resource.getClientPort());
+ queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+ queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index acaefc9..e4f47cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -31,6 +31,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.scheduler.SimpleFifoScheduler;
@@ -241,11 +242,11 @@ public class QueryJobManager extends CompositeService {
private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
- if(queryHeartbeat.getTajoWorkerHost() != null) {
- queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost());
- queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
- queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort());
- }
+ WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+ queryInfo.setQueryMaster(connectionInfo.getHost());
+ queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
queryInfo.setQueryState(queryHeartbeat.getState());
queryInfo.setProgress(queryHeartbeat.getQueryProgress());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b54675c..b8c39e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -192,9 +192,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
try {
- if (worker.getPeerRpcPort() == 0) continue;
-
- rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
@@ -214,9 +213,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
try {
- if (worker.getPeerRpcPort() == 0) continue;
-
- rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
@@ -299,9 +297,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
- .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
- .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setConnectionInfo(workerContext.getConnectionInfo().getProto())
.setState(state)
.setQueryId(queryId.getProto());
@@ -460,9 +456,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
- builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName());
- builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort());
- builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort());
+ builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
builder.setState(queryMasterTask.getState());
builder.setQueryId(queryMasterTask.getQueryId().getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 862dfef..f953995 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -133,7 +133,7 @@ public class QueryMasterManagerService extends CompositeService
ContainerId cid =
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
LOG.debug("getTask:" + cid + ", ebId:" + ebId);
- queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+ queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index f41fd0e..03c6d30 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -522,8 +522,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.successfulAttempt = attemptEvent.getTaskAttemptId();
- task.succeededHost = attempt.getHost();
- task.succeededPullServerPort = attempt.getPullServerPort();
+ task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+ task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
task.finishTask();
task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
@@ -537,7 +537,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.launchTime = System.currentTimeMillis();
- task.succeededHost = attempt.getHost();
+ task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
}
}
@@ -632,9 +632,12 @@ public class QueryUnit implements EventHandler<TaskEvent> {
public static class PullHost implements Cloneable {
String host;
int port;
+ int hashCode;
+
public PullHost(String pullServerAddr, int pullServerPort){
this.host = pullServerAddr;
this.port = pullServerPort;
+ this.hashCode = Objects.hashCode(host, port);
}
public String getHost() {
return host;
@@ -650,7 +653,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public int hashCode() {
- return Objects.hashCode(host, port);
+ return hashCode;
}
@Override
@@ -668,6 +671,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
PullHost newPullHost = (PullHost) super.clone();
newPullHost.host = host;
newPullHost.port = port;
+ newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
return newPullHost;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index a4fa12f..db6f130 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -55,8 +56,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
final EventHandler eventHandler;
private ContainerId containerId;
- private String hostName;
- private int port;
+ private WorkerConnectionInfo workerConnectionInfo;
private int expire;
private final Lock readLock;
@@ -210,30 +210,14 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
return this.queryUnit;
}
- public String getHost() {
- return this.hostName;
- }
-
- public int getPort() {
- return this.port;
+ public WorkerConnectionInfo getWorkerConnectionInfo() {
+ return this.workerConnectionInfo;
}
public void setContainerId(ContainerId containerId) {
this.containerId = containerId;
}
- public void setHost(String host) {
- this.hostName = host;
- }
-
- public void setPullServerPort(int port) {
- this.port = port;
- }
-
- public int getPullServerPort() {
- return port;
- }
-
public synchronized void setExpireTime(int expire) {
this.expire = expire;
}
@@ -277,7 +261,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
if (report.getShuffleFileOutputsCount() > 0) {
this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
- PullHost host = new PullHost(getHost(), getPullServerPort());
+ PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
getId().getId(), p.getPartId(), host, p.getVolume());
@@ -325,8 +309,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskAttemptEvent event) {
TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
taskAttempt.containerId = castEvent.getContainerId();
- taskAttempt.setHost(castEvent.getHostName());
- taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+ taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
taskAttempt.eventHandler.handle(
new TaskTAttemptEvent(taskAttempt.getId(),
TaskEventType.T_ATTEMPT_LAUNCHED));
@@ -415,7 +398,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+ LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+ + " >> " + errorEvent.errorMessage());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index 2229f04..5d07ff2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -37,16 +37,16 @@ public class TajoRMContext {
final Dispatcher rmDispatcher;
/** map between workerIds and running workers */
- private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+ private final ConcurrentMap<Integer, Worker> workers = Maps.newConcurrentMap();
/** map between workerIds and inactive workers */
- private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+ private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap();
/** map between queryIds and query master ContainerId */
private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
- private final Set<String> liveQueryMasterWorkerResources =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private final Set<Integer> liveQueryMasterWorkerResources =
+ Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
private final Set<QueryId> stoppedQueryIds =
Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
@@ -62,14 +62,14 @@ public class TajoRMContext {
/**
* @return The Map for active workers
*/
- public ConcurrentMap<String, Worker> getWorkers() {
+ public ConcurrentMap<Integer, Worker> getWorkers() {
return workers;
}
/**
* @return The Map for inactive workers
*/
- public ConcurrentMap<String, Worker> getInactiveWorkers() {
+ public ConcurrentMap<Integer, Worker> getInactiveWorkers() {
return inactiveWorkers;
}
@@ -81,7 +81,7 @@ public class TajoRMContext {
return qmContainerMap;
}
- public Set<String> getQueryMasterWorker() {
+ public Set<Integer> getQueryMasterWorker() {
return liveQueryMasterWorkerResources;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 4bd7adb..831ce43 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -111,9 +112,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
/** The response builder */
private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
- private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+ private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
return new WorkerStatusEvent(
- workerKey,
+ workerId,
heartbeat.getServerStatus().getRunningTaskNum(),
heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
@@ -128,7 +129,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
try {
// get a workerId from the heartbeat
- String workerId = createWorkerId(heartbeat);
+ int workerId = heartbeat.getConnectionInfo().getId();
if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
@@ -145,7 +146,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
// create new worker instance
Worker newWorker = createWorkerResource(heartbeat);
- String newWorkerId = newWorker.getWorkerId();
+ int newWorkerId = newWorker.getWorkerId();
// add the new worker to the list of active workers
rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
@@ -178,10 +179,6 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
}
- private static final String createWorkerId(NodeHeartbeat heartbeat) {
- return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
- }
-
private Worker createWorkerResource(NodeHeartbeat request) {
boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
@@ -204,14 +201,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
workerResource.setCpuCoreSlots(4);
}
- Worker worker = new Worker(rmContext, workerResource);
- worker.setHostName(request.getTajoWorkerHost());
- worker.setHttpPort(request.getTajoWorkerHttpPort());
- worker.setPeerRpcPort(request.getPeerRpcPort());
- worker.setQueryMasterPort(request.getTajoQueryMasterPort());
- worker.setClientPort(request.getTajoWorkerClientPort());
- worker.setPullServerPort(request.getTajoWorkerPullServerPort());
- return worker;
+ return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
}
public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
@@ -224,7 +214,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
int totalAvailableMemoryMB = 0;
synchronized(rmContext) {
- for(String eachWorker: rmContext.getWorkers().keySet()) {
+ for(int eachWorker: rmContext.getWorkers().keySet()) {
Worker worker = rmContext.getWorkers().get(eachWorker);
WorkerResource resource = worker.getResource();
if(worker != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 3915225..0e3ccad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -134,7 +134,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
@Override
public void handle(WorkerEvent event) {
- String workerId = event.getWorkerId();
+ int workerId = event.getWorkerId();
Worker node = this.rmContext.getWorkers().get(workerId);
if (node != null) {
try {
@@ -147,16 +147,16 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public Map<String, Worker> getWorkers() {
+ public Map<Integer, Worker> getWorkers() {
return ImmutableMap.copyOf(rmContext.getWorkers());
}
@Override
- public Map<String, Worker> getInactiveWorkers() {
+ public Map<Integer, Worker> getInactiveWorkers() {
return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
}
- public Collection<String> getQueryMasters() {
+ public Collection<Integer> getQueryMasters() {
return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
}
@@ -303,8 +303,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
new ArrayList<WorkerAllocatedResource>();
for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
- NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
- allocatedResource.worker.getPeerRpcPort());
+ NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
+ allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
@@ -315,12 +315,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
ContainerIdProto containerIdProto = containerId.getProto();
allocatedResources.add(WorkerAllocatedResource.newBuilder()
.setContainerId(containerIdProto)
- .setNodeId(nodeId.toString())
- .setWorkerHost(allocatedResource.worker.getHostName())
- .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
- .setClientPort(allocatedResource.worker.getClientPort())
- .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
- .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+ .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
.setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
.setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
.build());
@@ -339,7 +334,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if(LOG.isDebugEnabled()) {
LOG.debug("=========================================");
LOG.debug("Available Workers");
- for(String liveWorker: rmContext.getWorkers().keySet()) {
+ for(int liveWorker: rmContext.getWorkers().keySet()) {
LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
}
LOG.debug("=========================================");
@@ -367,7 +362,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
- List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
@@ -377,7 +372,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
resourceRequest.request.getMinDiskSlotPerContainer());
int liveWorkerSize = randomWorkers.size();
- Set<String> insufficientWorkers = new HashSet<String>();
+ Set<Integer> insufficientWorkers = new HashSet<Integer>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
@@ -394,7 +389,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
- for(String eachWorker: randomWorkers) {
+ for(int eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
@@ -436,7 +431,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
} else {
synchronized(rmContext) {
- List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
@@ -446,7 +441,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
resourceRequest.request.getMinMemoryMBPerContainer());
int liveWorkerSize = randomWorkers.size();
- Set<String> insufficientWorkers = new HashSet<String>();
+ Set<Integer> insufficientWorkers = new HashSet<Integer>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
@@ -463,7 +458,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
- for(String eachWorker: randomWorkers) {
+ for(int eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index de6ee9e..edded4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import java.util.EnumSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,24 +40,15 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
/** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
private final TajoRMContext rmContext;
- /** Hostname */
- private String hostName;
- /** QueryMaster rpc port */
- private int qmRpcPort;
- /** Peer rpc port */
- private int peerRpcPort;
- /** http info port */
- private int httpInfoPort;
- /** the port of QueryMaster client rpc which provides an client API */
- private int qmClientPort;
- /** pull server port */
- private int pullServerPort;
/** last heartbeat time */
private long lastHeartbeatTime;
/** Resource capability */
private WorkerResource resource;
+ /** Worker connection information */
+ private WorkerConnectionInfo connectionInfo;
+
private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
@@ -99,9 +91,10 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
stateMachineFactory.make(this, WorkerState.NEW);
- public Worker(TajoRMContext rmContext, WorkerResource resource) {
+ public Worker(TajoRMContext rmContext, WorkerResource resource, WorkerConnectionInfo connectionInfo) {
this.rmContext = rmContext;
+ this.connectionInfo = connectionInfo;
this.lastHeartbeatTime = System.currentTimeMillis();
this.resource = resource;
@@ -110,56 +103,12 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
this.writeLock = lock.writeLock();
}
- public String getWorkerId() {
- return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String allocatedHost) {
- this.hostName = allocatedHost;
- }
-
- public int getPeerRpcPort() {
- return peerRpcPort;
- }
-
- public void setPeerRpcPort(int peerRpcPort) {
- this.peerRpcPort = peerRpcPort;
- }
-
- public int getQueryMasterPort() {
- return qmRpcPort;
- }
-
- public void setQueryMasterPort(int queryMasterPort) {
- this.qmRpcPort = queryMasterPort;
- }
-
- public int getClientPort() {
- return qmClientPort;
- }
-
- public void setClientPort(int clientPort) {
- this.qmClientPort = clientPort;
- }
-
- public int getPullServerPort() {
- return pullServerPort;
- }
-
- public void setPullServerPort(int pullServerPort) {
- this.pullServerPort = pullServerPort;
- }
-
- public int getHttpPort() {
- return httpInfoPort;
+ public int getWorkerId() {
+ return connectionInfo.getId();
}
- public void setHttpPort(int port) {
- this.httpInfoPort = port;
+ public WorkerConnectionInfo getConnectionInfo() {
+ return connectionInfo;
}
public void setLastHeartbeatTime(long lastheartbeatReportTime) {
@@ -209,7 +158,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
if(o == null) {
return 1;
}
- return getWorkerId().compareTo(o.getWorkerId());
+ return connectionInfo.compareTo(o.connectionInfo);
}
public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
index 389c3be..c208990 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
* WorkerEvent describes all kinds of events which sent to {@link Worker}.
*/
public class WorkerEvent extends AbstractEvent<WorkerEventType> {
- private final String workerId;
+ private final int workerId;
- public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+ public WorkerEvent(int workerId, WorkerEventType workerEventType) {
super(workerEventType);
this.workerId = workerId;
}
- public String getWorkerId() {
+ public int getWorkerId() {
return workerId;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
index e3524d6..2751886 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -30,7 +30,7 @@ import org.apache.tajo.conf.TajoConf;
* It periodically checks the latest heartbeat time of {@link Worker}.
* If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
*/
-public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<Integer> {
private EventHandler dispatcher;
@@ -50,7 +50,7 @@ public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
}
@Override
- protected void expire(String id) {
+ protected void expire(Integer id) {
dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
index 46f286d..3828b6a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -24,7 +24,7 @@ package org.apache.tajo.master.rm;
*/
public class WorkerReconnectEvent extends WorkerEvent {
private final Worker worker;
- public WorkerReconnectEvent(String workerId, Worker worker) {
+ public WorkerReconnectEvent(int workerId, Worker worker) {
super(workerId, WorkerEventType.RECONNECTED);
this.worker = worker;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 54fe11c..8e8ac51 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -86,13 +86,13 @@ public interface WorkerResourceManager extends Service {
*
* @return a Map instance containing active workers
*/
- public Map<String, Worker> getWorkers();
+ public Map<Integer, Worker> getWorkers();
/**
*
* @return a Map instance containing inactive workers
*/
- public Map<String, Worker> getInactiveWorkers();
+ public Map<Integer, Worker> getInactiveWorkers();
public void stop();
@@ -106,5 +106,5 @@ public interface WorkerResourceManager extends Service {
*
* @return WorkerIds on which QueryMasters are running
*/
- Collection<String> getQueryMasters();
+ Collection<Integer> getQueryMasters();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
index 8c3d7c1..f1ab401 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -28,7 +28,7 @@ public class WorkerStatusEvent extends WorkerEvent {
private final long freeHeap;
private final long totalHeap;
- public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+ public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
super(workerId, WorkerEventType.STATE_UPDATE);
this.runningTaskNum = runningTaskNum;
this.maxHeap = maxHeap;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index 55aa8c4..ca71c53 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -22,10 +22,25 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+ /**
+ * A key is worker id, and a value is a worker connection information.
+ */
+ protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = Maps.newConcurrentMap();
+
+ public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) {
+ return workerInfoMap.get(workerId);
+ }
+
+ public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) {
+ workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
+ }
+
private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
public AbstractResourceAllocator() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index d4b9861..1ec8a88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -33,12 +33,14 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -79,6 +81,7 @@ public class ExecutionBlockContext {
private TajoQueryEngine queryEngine;
private RpcConnectionPool connPool;
private InetSocketAddress qmMasterAddr;
+ private WorkerConnectionInfo queryMaster;
private TajoConf systemConf;
// for the doAs block
private UserGroupInformation taskOwner;
@@ -92,12 +95,12 @@ public class ExecutionBlockContext {
private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
- public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+ public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster)
throws Throwable {
this.manager = manager;
this.executionBlockId = event.getExecutionBlockId();
this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
- this.qmMasterAddr = queryMaster;
+ this.queryMaster = queryMaster;
this.systemConf = manager.getTajoConf();
this.reporter = new Reporter();
this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
@@ -118,6 +121,7 @@ public class ExecutionBlockContext {
LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
+ this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), queryMaster.getQueryMasterPort());
LOG.info("QueryMaster Address:" + qmMasterAddr);
UserGroupInformation.setConfiguration(systemConf);
@@ -329,8 +333,8 @@ public class ExecutionBlockContext {
intermediateBuilder.clear();
intermediateBuilder.setEbId(ebId.getProto())
- .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
- getWorkerContext().getPullServerPort())
+ .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
+ getWorkerContext().getConnectionInfo().getPullServerPort())
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2cc8f0c..2220089 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -34,6 +34,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
@@ -129,6 +130,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
LOG.warn(e.getMessage());
}
}
+
+ workerInfoMap.clear();
super.stop();
}
@@ -325,8 +328,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
List<Container> containers = new ArrayList<Container>();
for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
- NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
- eachAllocatedResource.getPeerRpcPort());
+ NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
+ eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
@@ -343,14 +346,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
- Worker worker = new Worker(null, workerResource);
- worker.setHostName(nodeId.getHost());
- worker.setPeerRpcPort(nodeId.getPort());
- worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
- worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
-
+ Worker worker = new Worker(null, workerResource,
+ new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
container.setWorkerResource(worker);
-
+ addWorkerConnectionInfo(worker.getConnectionInfo());
containers.add(container);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index a8d661b..280fc2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.common.exception.NotImplementedException;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.ha.TajoMasterInfo;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.querymaster.QueryMasterManagerService;
@@ -94,12 +96,13 @@ public class TajoWorker extends CompositeService {
private TajoPullServerService pullService;
- private int pullServerPort;
-
+ @Deprecated
private boolean yarnContainerMode;
+ @Deprecated
private boolean queryMasterMode;
+ @Deprecated
private boolean taskRunnerMode;
private WorkerHeartbeatService workerHeartbeatThread;
@@ -110,7 +113,7 @@ public class TajoWorker extends CompositeService {
private TajoMasterProtocol.ClusterResourceSummary clusterResource;
- private int httpPort;
+ private WorkerConnectionInfo connectionInfo;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -199,71 +202,52 @@ public class TajoWorker extends CompositeService {
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+ addIfService(tajoWorkerManagerService);
+
// querymaster worker
tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
- addService(tajoWorkerClientService);
+ addIfService(tajoWorkerClientService);
queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
- addService(queryMasterManagerService);
+ addIfService(queryMasterManagerService);
// taskrunner worker
taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
addService(taskRunnerManager);
- tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
- addService(tajoWorkerManagerService);
-
- if(!yarnContainerMode) {
- if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
- pullService = new TajoPullServerService();
- addService(pullService);
- }
+ workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+ addIfService(workerHeartbeatThread);
- if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
- try {
- httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
- if(queryMasterMode && !taskRunnerMode) {
- //If QueryMaster and TaskRunner run on single host, http port conflicts
- httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
- }
- webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
- true, null, systemConf, null);
- webServer.start();
- httpPort = webServer.getPort();
- LOG.info("Worker info server started:" + httpPort);
-
- deletionService = new DeletionService(getMountPath().size(), 0);
- if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
- getWorkerContext().cleanupTemporalDirectories();
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
+ int httpPort = 0;
+ if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
+ pullService = new TajoPullServerService();
+ addIfService(pullService);
}
- LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
- ", qmRpcPort=" + qmManagerPort +
- ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
- ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ httpPort = initWebServer();
+ }
super.serviceInit(conf);
- tajoMasterInfo = new TajoMasterInfo();
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+ int pullServerPort;
+ if(pullService != null){
+ pullServerPort = pullService.getPort();
} else {
- tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
- tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS)));
+ pullServerPort = getStandAlonePullServerPort();
}
- connectToCatalog();
- workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
- workerHeartbeatThread.init(conf);
- addIfService(workerHeartbeatThread);
+ this.connectionInfo = new WorkerConnectionInfo(
+ tajoWorkerManagerService.getBindAddr().getHostName(),
+ tajoWorkerManagerService.getBindAddr().getPort(),
+ pullServerPort,
+ tajoWorkerClientService.getBindAddr().getPort(),
+ queryMasterManagerService.getBindAddr().getPort(),
+ httpPort);
+
+ LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode
+ + " connection :" + connectionInfo.toString());
try {
hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf);
@@ -300,14 +284,57 @@ public class TajoWorker extends CompositeService {
});
}
+ private int initWebServer() {
+ int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+ try {
+ if (queryMasterMode && !taskRunnerMode) {
+ //If QueryMaster and TaskRunner run on single host, http port conflicts
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+ }
+ webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort,
+ true, null, systemConf, null);
+ webServer.start();
+ httpPort = webServer.getPort();
+ LOG.info("Worker info server started:" + httpPort);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return httpPort;
+ }
+
+ private void initCleanupService() throws IOException {
+ deletionService = new DeletionService(getMountPath().size(), 0);
+ if (systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)) {
+ getWorkerContext().cleanupTemporalDirectories();
+ }
+ }
+
public WorkerContext getWorkerContext() {
return workerContext;
}
@Override
public void serviceStart() throws Exception {
+
+ tajoMasterInfo = new TajoMasterInfo();
+ if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+ } else {
+ tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .RESOURCE_TRACKER_RPC_ADDRESS)));
+ }
+ connectToCatalog();
+
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ initCleanupService();
+ }
+
initWorkerMetrics();
super.serviceStart();
+ LOG.info("Tajo Worker is started");
}
@Override
@@ -319,7 +346,7 @@ public class TajoWorker extends CompositeService {
if(webServer != null) {
try {
webServer.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
@@ -336,7 +363,7 @@ public class TajoWorker extends CompositeService {
if(webServer != null && webServer.isAlive()) {
try {
webServer.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
}
}
@@ -381,15 +408,19 @@ public class TajoWorker extends CompositeService {
return catalogClient;
}
- public int getHttpPort() {
- return httpPort;
+ public TajoPullServerService getPullService() {
+ return pullService;
+ }
+
+ public WorkerConnectionInfo getConnectionInfo() {
+ return connectionInfo;
}
public String getWorkerName() {
if (queryMasterMode) {
return getQueryMasterManagerService().getHostAndPort();
} else {
- return getTajoWorkerManagerService().getHostAndPort();
+ return connectionInfo.getHostAndPeerRpcPort();
}
}
@@ -444,6 +475,7 @@ public class TajoWorker extends CompositeService {
}
}
+ @Deprecated
public boolean isYarnContainerMode() {
return yarnContainerMode;
}
@@ -503,45 +535,20 @@ public class TajoWorker extends CompositeService {
public HashShuffleAppenderManager getHashShuffleAppenderManager() {
return hashShuffleAppenderManager;
}
-
- public int getPullServerPort() {
- if (pullService != null) {
- long startTime = System.currentTimeMillis();
- while (true) {
- int pullServerPort = pullService.getPort();
- if (pullServerPort > 0) {
- return pullServerPort;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- if (System.currentTimeMillis() - startTime > 30 * 1000) {
- LOG.fatal("TajoWorker stopped cause can't get PullServer port.");
- System.exit(-1);
- }
- }
- } else {
- if (pullServerPort != 0) {
- return pullServerPort;
- } else {
- loadPullServerPort();
- return pullServerPort;
- }
- }
- }
}
- private void loadPullServerPort() {
- // get pull server port
+ private int getStandAlonePullServerPort() {
long startTime = System.currentTimeMillis();
+ int pullServerPort;
+
+ //wait for pull server bring up
while (true) {
pullServerPort = TajoPullServerService.readPullServerPort();
if (pullServerPort > 0) {
break;
}
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
}
if (System.currentTimeMillis() - startTime > 30 * 1000) {
@@ -549,6 +556,7 @@ public class TajoWorker extends CompositeService {
System.exit(-1);
}
}
+ return pullServerPort;
}
public void stopWorkerForce() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index d25013c..fb4f861 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -54,7 +54,7 @@ public class TajoWorkerClientService extends AbstractService {
private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private String addr;
+
private int port;
private TajoConf conf;
private TajoWorker.WorkerContext workerContext;
@@ -88,14 +88,12 @@ public class TajoWorkerClientService extends AbstractService {
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
- LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+ LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + bindAddr);
super.init(conf);
}