You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/20 08:54:15 UTC
[1/2] TAJO-1016: Refactor worker rpc information. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 469820db1 -> 28282b561
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 472ce1b..48f4f66 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
@@ -46,7 +47,6 @@ public class TajoWorkerManagerService extends CompositeService
private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private String addr;
private int port;
private TajoWorker.WorkerContext workerContext;
@@ -74,14 +74,12 @@ public class TajoWorkerManagerService extends CompositeService
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
- LOG.info("TajoWorkerManagerService is bind to " + addr);
+ LOG.info("TajoWorkerManagerService is bind to " + bindAddr);
tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
super.init(tajoConf);
}
@@ -104,10 +102,6 @@ public class TajoWorkerManagerService extends CompositeService
return bindAddr;
}
- public String getHostAndPort() {
- return bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
@Override
public void ping(RpcController controller,
TajoIdProtos.QueryUnitAttemptIdProto attemptId,
@@ -122,24 +116,11 @@ public class TajoWorkerManagerService extends CompositeService
workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
try {
-
- String[] params = new String[7];
- params[0] = "standby"; //mode(never used)
- params[1] = request.getExecutionBlockId().toString();
- // NodeId has a form of hostname:port.
- params[2] = request.getNodeId();
- params[3] = request.getContainerId();
-
- // QueryMaster's address
- params[4] = request.getQueryMasterHost();
- params[5] = String.valueOf(request.getQueryMasterPort());
- params[6] = request.getQueryOutputPath();
-
- ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
- params
- , executionBlockId,
- new QueryContext(workerContext.getConf(), request.getQueryContext()),
+ new WorkerConnectionInfo(request.getQueryMaster())
+ , new ExecutionBlockId(request.getExecutionBlockId())
+ , request.getContainerId()
+ , new QueryContext(workerContext.getConf(), request.getQueryContext()),
request.getPlanJson()
));
done.run(TajoWorker.TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index c9c83d1..66e0f87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -307,7 +307,7 @@ public class Task {
public TaskStatusProto getReport() {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
- builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString());
+ builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
builder.setId(context.getTaskId().getProto())
.setProgress(context.getProgress())
.setState(context.getState());
@@ -323,6 +323,7 @@ public class Task {
public boolean isRunning(){
return context.getState() == TaskAttemptState.TA_RUNNING;
}
+
public boolean isProgressChanged() {
return context.isProgressChanged();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index ea8ed82..e4771a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
@@ -53,7 +52,6 @@ public class TaskRunner extends AbstractService {
private volatile boolean stopped = false;
private Path baseDirPath;
- private NodeId nodeId;
private ContainerId containerId;
// for Fetcher
@@ -69,7 +67,7 @@ public class TaskRunner extends AbstractService {
private TaskRunnerHistory history;
- public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) {
+ public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) {
super(TaskRunner.class.getName());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@@ -78,16 +76,7 @@ public class TaskRunner extends AbstractService {
this.fetchLauncher = Executors.newFixedThreadPool(
systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
try {
- // QueryBlockId from String
- // NodeId has a form of hostname:port.
- this.nodeId = ConverterUtils.toNodeId(args[2]);
- this.containerId = ConverterUtils.toContainerId(args[3]);
-
-
- // QueryMaster's address
- //String host = args[4];
- //int port = Integer.parseInt(args[5]);
-
+ this.containerId = ConverterUtils.toContainerId(containerId);
this.executionBlockContext = executionBlockContext;
this.history = executionBlockContext.createTaskRunnerHistory(this);
this.history.setState(getServiceState());
@@ -101,10 +90,6 @@ public class TaskRunner extends AbstractService {
return getId(getContext().getExecutionBlockId(), containerId);
}
- public NodeId getNodeId(){
- return nodeId;
- }
-
public ContainerId getContainerId(){
return containerId;
}
@@ -212,6 +197,7 @@ public class TaskRunner extends AbstractService {
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(getExecutionBlockId().getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
.build();
qmClientService.getTask(null, request, callFuture);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index c3713d1..faadf58 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -37,7 +37,6 @@ import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -167,14 +166,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
if (event instanceof TaskRunnerStartEvent) {
TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
- String[] params = startEvent.getParams();
+
if(context == null){
try {
- // QueryMaster's address
- String host = params[4];
- int port = Integer.parseInt(params[5]);
-
- context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port));
+ context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster());
} catch (Throwable e) {
LOG.fatal(e.getMessage(), e);
throw new RuntimeException(e);
@@ -182,7 +177,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
executionBlockContextMap.put(event.getExecutionBlockId(), context);
}
- TaskRunner taskRunner = new TaskRunner(context, params);
+ TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId());
LOG.info("Start TaskRunner:" + taskRunner.getId());
taskRunnerMap.put(taskRunner.getId(), taskRunner);
taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 47f2261..6a90f74 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -19,6 +19,7 @@
package org.apache.tajo.worker;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,7 +38,6 @@ import org.apache.tajo.storage.v2.DiskUtil;
import org.apache.tajo.util.HAServiceUtil;
import java.io.File;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -68,30 +68,38 @@ public class WorkerHeartbeatService extends AbstractService {
}
@Override
- public void serviceInit(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
this.systemConf = (TajoConf) conf;
connectionPool = RpcConnectionPool.getPool(systemConf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
thread = new WorkerHeartbeatThread();
thread.start();
- super.init(conf);
+ super.serviceStart();
}
@Override
- public void serviceStop() {
- thread.stopped.set(true);
+ public void serviceStop() throws Exception {
+ if(thread.stopped.getAndSet(true)){
+ return;
+ }
+
synchronized (thread) {
thread.notifyAll();
}
- super.stop();
+
+ super.serviceStop();
}
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
- new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
@@ -140,26 +148,6 @@ public class WorkerHeartbeatService extends AbstractService {
public void run() {
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
- int pullServerPort = 0;
-
- String hostName = null;
- int peerRpcPort = 0;
- int queryMasterPort = 0;
- int clientPort = 0;
-
- if(context.getTajoWorkerManagerService() != null) {
- hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
- peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
- }
- if(context.getQueryMasterManagerService() != null) {
- hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
- queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
- }
- if(context.getTajoWorkerClientService() != null) {
- clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
- }
-
- pullServerPort = context.getPullServerPort();
while(!stopped.get()) {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
@@ -185,12 +173,7 @@ public class WorkerHeartbeatService extends AbstractService {
.build();
NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
- .setTajoWorkerHost(hostName)
- .setTajoQueryMasterPort(queryMasterPort)
- .setPeerRpcPort(peerRpcPort)
- .setTajoWorkerClientPort(clientPort)
- .setTajoWorkerHttpPort(context.getHttpPort())
- .setTajoWorkerPullServerPort(pullServerPort)
+ .setConnectionInfo(context.getConnectionInfo().getProto())
.setServerStatus(serverStatus)
.build();
@@ -241,8 +224,10 @@ public class WorkerHeartbeatService extends AbstractService {
}
try {
- synchronized (WorkerHeartbeatThread.this){
- wait(10 * 1000);
+ if(!stopped.get()){
+ synchronized (thread){
+ thread.wait(10 * 1000);
+ }
}
} catch (InterruptedException e) {
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 8c9fa51..ff63754 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,25 +20,33 @@ package org.apache.tajo.worker.event;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
public class TaskRunnerStartEvent extends TaskRunnerEvent {
private final QueryContext queryContext;
- private final String[] params;
+ private final WorkerConnectionInfo queryMaster;
+ private final String containerId;
private final String plan;
- public TaskRunnerStartEvent(String[] params,
+ public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
ExecutionBlockId executionBlockId,
+ String containerId,
QueryContext context,
String plan) {
super(EventType.START, executionBlockId);
- this.params = params;
+ this.queryMaster = queryMaster;
+ this.containerId = containerId;
this.queryContext = context;
this.plan = plan;
}
- public String[] getParams(){
- return this.params;
+ public WorkerConnectionInfo getQueryMaster() {
+ return queryMaster;
+ }
+
+ public String getContainerId() {
+ return containerId;
}
public QueryContext getQueryContext() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index d46d09a..b117cac 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,16 +23,12 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "TajoMasterProtocol.proto";
+import "tajo_protos.proto";
message NodeHeartbeat {
- required string tajoWorkerHost = 1;
- required int32 peerRpcPort = 2;
- required int32 tajoQueryMasterPort = 3;
- optional ServerStatusProto serverStatus = 4;
- optional int32 tajoWorkerClientPort = 5;
- optional string statusMessage = 6;
- optional int32 tajoWorkerPullServerPort = 7;
- optional int32 tajoWorkerHttpPort = 8;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional ServerStatusProto serverStatus = 2;
+ optional string statusMessage = 3;
}
service TajoResourceTrackerProtocolService {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 8fccbaf..7283543 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -60,14 +60,12 @@ message ServerStatusProto {
}
message TajoHeartbeat {
- required string tajoWorkerHost = 1;
- required int32 tajoQueryMasterPort = 2;
- optional int32 tajoWorkerClientPort = 3;
- optional QueryIdProto queryId = 4;
- optional QueryState state = 5;
- optional string statusMessage = 6;
- optional float queryProgress = 7;
- optional int64 queryFinishTime = 8;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional QueryIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional string statusMessage = 4;
+ optional float queryProgress = 5;
+ optional int64 queryFinishTime = 6;
}
message TajoHeartbeatResponse {
@@ -110,12 +108,9 @@ message WorkerResourceAllocationRequest {
}
message WorkerResourceProto {
- required string host = 1;
- required int32 peerRpcPort = 2;
- required int32 queryMasterPort = 3;
- required int32 infoPort = 4;
- required int32 memoryMB = 5 ;
- required float diskSlots = 6;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required int32 memoryMB = 2 ;
+ required float diskSlots = 3;
}
message WorkerResourcesRequest {
@@ -129,15 +124,10 @@ message WorkerResourceReleaseRequest {
message WorkerAllocatedResource {
required hadoop.yarn.ContainerIdProto containerId = 1;
- required string nodeId = 2;
- required string workerHost = 3;
- required int32 peerRpcPort = 4;
- required int32 queryMasterPort = 5;
- required int32 clientPort = 6;
- required int32 workerPullServerPort = 7;
-
- required int32 allocatedMemoryMB = 8;
- required float allocatedDiskSlots = 9;
+ required WorkerConnectionInfoProto connectionInfo = 2;
+
+ required int32 allocatedMemoryMB = 3;
+ required float allocatedDiskSlots = 4;
}
message WorkerResourceAllocationResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dff2733..bde2459 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -168,8 +168,9 @@ message QueryExecutionRequestProto {
}
message GetTaskRequestProto {
- required hadoop.yarn.ContainerIdProto containerId = 1;
- required ExecutionBlockIdProto executionBlockId = 2;
+ required int32 workerId = 1;
+ required hadoop.yarn.ContainerIdProto containerId = 2;
+ required ExecutionBlockIdProto executionBlockId = 3;
}
enum ShuffleType {
@@ -202,14 +203,13 @@ message DataChannelProto {
message RunExecutionBlockRequestProto {
required ExecutionBlockIdProto executionBlockId = 1;
- required string queryMasterHost = 2;
- required int32 queryMasterPort = 3;
- required string nodeId = 4;
- required string containerId = 5;
- optional string queryOutputPath = 6;
-
- required KeyValueSetProto queryContext = 7;
- required string planJson = 8;
+ required WorkerConnectionInfoProto queryMaster = 2;
+ required string nodeId = 3;
+ required string containerId = 4;
+ optional string queryOutputPath = 5;
+
+ required KeyValueSetProto queryContext = 6;
+ required string planJson = 7;
}
message ExecutionBlockListProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 0317759..6fe21a2 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -20,20 +20,21 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
<%@ page import="org.apache.tajo.master.ha.HAService" %>
<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.util.TUtil" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
Collections.sort(wokerKeys);
int runningQueryMasterTasks = 0;
@@ -175,17 +176,19 @@
<%
int no = 1;
for(Worker queryMaster: liveQueryMasters) {
- WorkerResource resource = queryMaster.getResource();
- String queryMasterHttp = "http://" + queryMaster.getHostName() + ":" + queryMaster.getHttpPort() + "/index.jsp";
+ WorkerResource resource = queryMaster.getResource();
+ WorkerConnectionInfo connectionInfo = queryMaster.getConnectionInfo();
+ String queryMasterHttp = "http://" + connectionInfo.getHost()
+ + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
%>
<tr>
- <td width='30' align='right'><%=no++%></td>
- <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></a></td>
- <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
- <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
- <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
- <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
- <td width='100' align='center'><%=queryMaster.getState()%></td>
+ <td width='30' align='right'><%=no++%></td>
+ <td><a href='<%=queryMasterHttp%>'><%=connectionInfo.getHost() + ":" + connectionInfo.getQueryMasterPort()%></a></td>
+ <td width='100' align='center'><%=connectionInfo.getClientPort()%></td>
+ <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
+ <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+ <td width='100' align='center'><%=queryMaster.getState()%></td>
</tr>
<%
} //end fo for
@@ -210,7 +213,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></td>
+ <td><%=queryMaster.getConnectionInfo().getHost() + ":" + queryMaster.getConnectionInfo().getQueryMasterPort()%></td>
</tr>
<%
} //end fo for
@@ -236,19 +239,20 @@
<%
int no = 1;
for(Worker worker: liveWorkers) {
- WorkerResource resource = worker.getResource();
- String workerHttp = "http://" + worker.getHostName() + ":" + worker.getHttpPort() + "/index.jsp";
+ WorkerResource resource = worker.getResource();
+ WorkerConnectionInfo connectionInfo = worker.getConnectionInfo();
+ String workerHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
%>
<tr>
- <td width='30' align='right'><%=no++%></td>
- <td><a href='<%=workerHttp%>'><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></a></td>
- <td width='80' align='center'><%=worker.getPullServerPort()%></td>
- <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
- <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
- <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
- <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
- <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
- <td width='100' align='center'><%=worker.getState()%></td>
+ <td width='30' align='right'><%=no++%></td>
+ <td><a href='<%=workerHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td>
+ <td width='80' align='center'><%=connectionInfo.getPullServerPort()%></td>
+ <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
+ <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
+ <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
+ <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+ <td width='100' align='center'><%=worker.getState()%></td>
</tr>
<%
} //end fo for
@@ -279,7 +283,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></td>
+ <td><%=worker.getConnectionInfo().getHostAndPeerRpcPort()%></td>
</tr>
<%
} //end fo for
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 7ab1482..ce4d7dc 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -38,8 +38,8 @@
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
- Map<String, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
int numWorkers = 0;
int numLiveWorkers = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index e7b402f..9ddc90c 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -43,17 +43,17 @@
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
Map<String, Integer> portMap = new HashMap<String, Integer>();
- Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters();
+ Collection<Integer> queryMasters = master.getContext().getResourceManager().getQueryMasters();
if (queryMasters == null || queryMasters.isEmpty()) {
queryMasters = master.getContext().getResourceManager().getWorkers().keySet();
}
- for(String eachQueryMasterKey: queryMasters) {
+ for(int eachQueryMasterKey: queryMasters) {
Worker queryMaster = workers.get(eachQueryMasterKey);
if(queryMaster != null) {
- portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort());
+ portMap.put(queryMaster.getConnectionInfo().getHost(), queryMaster.getConnectionInfo().getHttpInfoPort());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 1a325da..6e74b99 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -67,10 +67,10 @@
List<TajoMasterProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext()
.getQueryMasterManagerService().getQueryMaster().getAllWorker();
- Map<String, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<String, TajoMasterProtocol.WorkerResourceProto>();
+ Map<Integer, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<Integer, TajoMasterProtocol.WorkerResourceProto>();
if(allWorkers != null) {
for(TajoMasterProtocol.WorkerResourceProto eachWorker: allWorkers) {
- workerMap.put(eachWorker.getHost(), eachWorker);
+ workerMap.put(eachWorker.getConnectionInfo().getId(), eachWorker);
}
}
QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
@@ -201,12 +201,13 @@
String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost();
if(eachQueryUnit.getSucceededHost() != null) {
- TajoMasterProtocol.WorkerResourceProto worker = workerMap.get(eachQueryUnit.getSucceededHost());
+ TajoMasterProtocol.WorkerResourceProto worker =
+ workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId());
if(worker != null) {
QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt();
if(lastAttempt != null) {
QueryUnitAttemptId lastAttemptId = lastAttempt.getId();
- queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
+ queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index e20ab03..d84664f 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -22,7 +22,9 @@
<%@ page import="org.apache.commons.lang.StringUtils" %>
<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.*" %>
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.List" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index b5fb9d7..ae05047 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -65,9 +65,9 @@
<tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
<%
if (taskRunner != null) {
- TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+ ExecutionBlockContext context = taskRunner.getContext();
- for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) {
+ for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) {
QueryUnitAttemptId queryUnitId = entry.getKey();
TaskHistory eachTask = entry.getValue().createTaskHistory();
%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
new file mode 100644
index 0000000..03be125
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cluster;
+
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestWorkerConnectionInfo {
+
+ @Test
+ public void testWorkerId() {
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ WorkerConnectionInfo worker2 = new WorkerConnectionInfo("host2", 28091, 28092, 21000, 28093, 28080);
+
+ assertNotEquals(worker.getId(), worker2.getId());
+ assertEquals(worker.getId(), new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080).getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 09d674a..0423894 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -24,6 +24,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.junit.Test;
@@ -94,11 +95,10 @@ public class TestTajoResourceManager {
.setRunningTaskNum(0)
.build();
+ WorkerConnectionInfo connectionInfo =
+ new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 28093, 28080);
NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
- .setTajoWorkerHost("host" + (i + 1))
- .setTajoQueryMasterPort(21000)
- .setTajoWorkerHttpPort(28080 + i)
- .setPeerRpcPort(12345)
+ .setConnectionInfo(connectionInfo.getProto())
.setServerStatus(serverStatus)
.build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 720f0ca..3fa67ae 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -215,11 +215,10 @@ public class TajoPullServerService extends AbstractService {
selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
localFS = new LocalFileSystem();
- super.init(conf);
- this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
+ conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
-
+ super.init(conf);
LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
} catch (Throwable t) {
LOG.error(t);
@@ -228,8 +227,7 @@ public class TajoPullServerService extends AbstractService {
// TODO change AbstractService to throw InterruptedException
@Override
- public synchronized void start() {
- Configuration conf = getConfig();
+ public synchronized void serviceInit(Configuration conf) throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap(selector);
try {
@@ -248,11 +246,11 @@ public class TajoPullServerService extends AbstractService {
conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
pipelineFact.PullServer.setPort(port);
LOG.info(getName() + " listening on port " + port);
- super.start();
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
if (STANDALONE) {
File pullServerPortFile = getPullServerPortFile();
if (pullServerPortFile.exists()) {
@@ -272,6 +270,7 @@ public class TajoPullServerService extends AbstractService {
IOUtils.closeStream(out);
}
}
+ super.serviceInit(conf);
LOG.info("TajoPullServerService started: port=" + port);
}
@@ -487,9 +486,7 @@ public class TajoPullServerService extends AbstractService {
}
ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
- synchronized(processingStatusMap) {
- processingStatusMap.put(request.getUri().toString(), processingStatus);
- }
+ processingStatusMap.put(request.getUri().toString(), processingStatus);
// Parsing the URL into key-values
final Map<String, List<String>> params =
new QueryStringDecoder(request.getUri()).getParameters();
[2/2] git commit: TAJO-1016: Refactor worker rpc information. (jinho)
Posted by jh...@apache.org.
TAJO-1016: Refactor worker rpc information. (jinho)
Closes #125
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28282b56
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28282b56
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28282b56
Branch: refs/heads/master
Commit: 28282b561ad0a75f9603936bf04f2aa5c99b6b58
Parents: 469820d
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 15:53:06 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 15:53:06 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../java/org/apache/tajo/client/TajoAdmin.java | 36 ++--
tajo-client/src/main/proto/ClientProtos.proto | 37 ++--
tajo-common/src/main/proto/tajo_protos.proto | 10 ++
.../tajo/master/DefaultTaskScheduler.java | 18 +-
.../apache/tajo/master/LazyTaskScheduler.java | 4 +-
.../apache/tajo/master/TajoContainerProxy.java | 5 +-
.../tajo/master/TajoMasterClientService.java | 12 +-
.../apache/tajo/master/TajoMasterService.java | 9 +-
.../master/cluster/WorkerConnectionInfo.java | 178 +++++++++++++++++++
.../master/event/TaskAttemptAssignedEvent.java | 17 +-
.../tajo/master/event/TaskRequestEvent.java | 13 +-
.../master/querymaster/QueryInProgress.java | 6 +-
.../master/querymaster/QueryJobManager.java | 11 +-
.../tajo/master/querymaster/QueryMaster.java | 18 +-
.../querymaster/QueryMasterManagerService.java | 2 +-
.../tajo/master/querymaster/QueryUnit.java | 12 +-
.../master/querymaster/QueryUnitAttempt.java | 32 +---
.../apache/tajo/master/rm/TajoRMContext.java | 14 +-
.../tajo/master/rm/TajoResourceTracker.java | 24 +--
.../master/rm/TajoWorkerResourceManager.java | 33 ++--
.../java/org/apache/tajo/master/rm/Worker.java | 73 ++------
.../org/apache/tajo/master/rm/WorkerEvent.java | 6 +-
.../tajo/master/rm/WorkerLivelinessMonitor.java | 4 +-
.../tajo/master/rm/WorkerReconnectEvent.java | 2 +-
.../tajo/master/rm/WorkerResourceManager.java | 6 +-
.../tajo/master/rm/WorkerStatusEvent.java | 2 +-
.../tajo/worker/AbstractResourceAllocator.java | 15 ++
.../tajo/worker/ExecutionBlockContext.java | 12 +-
.../tajo/worker/TajoResourceAllocator.java | 17 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 178 ++++++++++---------
.../tajo/worker/TajoWorkerClientService.java | 6 +-
.../tajo/worker/TajoWorkerManagerService.java | 31 +---
.../main/java/org/apache/tajo/worker/Task.java | 3 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 20 +--
.../apache/tajo/worker/TaskRunnerManager.java | 11 +-
.../tajo/worker/WorkerHeartbeatService.java | 57 +++---
.../tajo/worker/event/TaskRunnerStartEvent.java | 18 +-
.../main/proto/ResourceTrackerProtocol.proto | 12 +-
.../src/main/proto/TajoMasterProtocol.proto | 36 ++--
.../src/main/proto/TajoWorkerProtocol.proto | 20 +--
.../main/resources/webapps/admin/cluster.jsp | 54 +++---
.../src/main/resources/webapps/admin/index.jsp | 4 +-
.../src/main/resources/webapps/admin/query.jsp | 8 +-
.../resources/webapps/worker/querytasks.jsp | 9 +-
.../resources/webapps/worker/taskdetail.jsp | 2 +
.../src/main/resources/webapps/worker/tasks.jsp | 4 +-
.../tajo/cluster/TestWorkerConnectionInfo.java | 36 ++++
.../tajo/master/rm/TestTajoResourceManager.java | 8 +-
.../tajo/pullserver/TajoPullServerService.java | 15 +-
50 files changed, 632 insertions(+), 530 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a5f31f9..5d92881 100644
--- a/CHANGES
+++ b/CHANGES
@@ -444,6 +444,8 @@ Release 0.9.0 - unreleased
SUB TASKS
+ TAJO-1016: Refactor worker rpc information. (jinho)
+
TAJO-1015: Add executionblock event in worker. (jinho)
TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 95dfc68..1acdb4d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -276,14 +276,15 @@ public class TajoAdmin {
line5, line10, line10);
writer.write(line);
for (WorkerResourceInfo queryMaster : liveQueryMasters) {
- String queryMasterHost = String.format("%s:%d",
- queryMaster.getAllocatedHost(),
- queryMaster.getQueryMasterPort());
- String heap = String.format("%d MB", queryMaster.getMaxHeap()/1024/1024);
- line = String.format(fmtQueryMasterLine, queryMasterHost,
- queryMaster.getClientPort(),
- queryMaster.getNumQueryMasterTasks(),
- heap, queryMaster.getWorkerStatus());
+ TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+ String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+ String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024);
+ line = String.format(fmtQueryMasterLine,
+ queryMasterHost,
+ connInfo.getClientPort(),
+ queryMaster.getNumQueryMasterTasks(),
+ heap,
+ queryMaster.getWorkerStatus());
writer.write(line);
}
@@ -301,12 +302,12 @@ public class TajoAdmin {
writer.write(line);
for (WorkerResourceInfo queryMaster : deadQueryMasters) {
- String queryMasterHost = String.format("%s:%d",
- queryMaster.getAllocatedHost(),
- queryMaster.getQueryMasterPort());
- line = String.format(fmtQueryMasterLine, queryMasterHost,
- queryMaster.getClientPort(),
- queryMaster.getWorkerStatus());
+ TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+ String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+ line = String.format(fmtQueryMasterLine,
+ queryMasterHost,
+ connInfo.getClientPort(),
+ queryMaster.getWorkerStatus());
writer.write(line);
}
@@ -358,9 +359,8 @@ public class TajoAdmin {
writer.write(line);
for (WorkerResourceInfo worker : workers) {
- String workerHost = String.format("%s:%d",
- worker.getAllocatedHost(),
- worker.getPeerRpcPort());
+ TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo();
+ String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort());
String mem = String.format("%d/%d", worker.getUsedMemoryMB(),
worker.getMemoryMB());
String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(),
@@ -369,7 +369,7 @@ public class TajoAdmin {
worker.getMaxHeap()/1024/1024);
line = String.format(fmtWorkerLine, workerHost,
- worker.getPullServerPort(),
+ connInfo.getPullServerPort(),
worker.getNumRunningTasks(),
mem, disk, heap, worker.getWorkerStatus());
writer.write(line);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index c66b228..0359685 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -157,27 +157,22 @@ message GetClusterInfoRequest {
}
message WorkerResourceInfo {
- required string allocatedHost = 1;
- required int32 peerRpcPort = 2;
- required int32 queryMasterPort = 3;
- required int32 clientPort = 4;
- required int32 pullServerPort = 5;
- required int32 httpPort = 6;
- required float diskSlots = 7;
- required int32 cpuCoreSlots = 8;
- required int32 memoryMB = 9;
- required float usedDiskSlots = 10;
- required int32 usedMemoryMB = 11;
- required int32 usedCpuCoreSlots = 12;
- required int64 maxHeap = 13;
- required int64 freeHeap = 14;
- required int64 totalHeap = 15;
- required int32 numRunningTasks = 16;
- required string workerStatus = 17;
- required int64 lastHeartbeat = 18;
- required bool queryMasterMode = 19;
- required bool taskRunnerMode = 20;
- required int32 numQueryMasterTasks = 21;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required float diskSlots = 2;
+ required int32 cpuCoreSlots = 3;
+ required int32 memoryMB = 4;
+ required float usedDiskSlots = 5;
+ required int32 usedMemoryMB = 6;
+ required int32 usedCpuCoreSlots = 7;
+ required int64 maxHeap = 8;
+ required int64 freeHeap = 9;
+ required int64 totalHeap = 10;
+ required int32 numRunningTasks = 11;
+ required string workerStatus = 12;
+ required int64 lastHeartbeat = 13;
+ required bool queryMasterMode = 14;
+ required bool taskRunnerMode = 15;
+ required int32 numQueryMasterTasks = 16;
}
message GetClusterInfoResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index edd27fc..b6cd9ef 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -52,4 +52,14 @@ enum FetcherState {
FETCH_FETCHING = 1;
FETCH_FINISHED = 2;
FETCH_FAILED = 3;
+}
+
+message WorkerConnectionInfoProto {
+ required int32 id = 1;
+ required string host = 2;
+ required int32 peerRpcPort = 3;
+ required int32 pullServerPort = 4;
+ optional int32 queryMasterPort = 5;
+ required int32 clientPort = 6;
+ required int32 httpInfoPort = 7;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 7684df2..2cb8878 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -33,6 +33,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryUnitRequest;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -744,13 +745,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
// getting the hostname of requested node
- String host = container.getTaskHostName();
+ WorkerConnectionInfo connectionInfo =
+ context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
+ String host = connectionInfo.getHost();
// if there are no worker matched to the hostname a task request
if(!leafTaskHostMapping.containsKey(host)){
- host = NetUtils.normalizeHost(host);
+ String normalizedHost = NetUtils.normalizeHost(host);
- if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
+ if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
// this case means one of either cases:
// * there are no blocks which reside in this node.
// * all blocks which reside in this node are consumed, and this task runner requests a remote task.
@@ -826,8 +829,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(),
- host, container.getTaskPort()));
+ taskRequest.getContainerId(), connectionInfo));
assignedRequest.add(attemptId);
scheduledObjectNum--;
@@ -891,10 +893,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
}
- ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
- taskRequest.getContainerId());
+ WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
+ getWorkerConnectionInfo(taskRequest.getWorkerId());
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+ taskRequest.getContainerId(), connectionInfo));
taskRequest.getCallback().run(taskAssign.getProto());
totalAssigned++;
scheduledObjectNum--;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 6552998..f7953e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -469,8 +469,6 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
QueryUnitAttemptId attemptId = taskAttempt.getId();
- ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
- getContainer(attemptContext.getContainerId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
@@ -495,7 +493,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
}
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
- attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+ attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
totalAssigned++;
attemptContext.getCallback().run(taskAssign.getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index c317ba5..c236c20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -59,7 +59,7 @@ public class TajoContainerProxy extends ContainerProxy {
context.getResourceAllocator().addContainer(containerID, this);
this.hostName = container.getNodeId().getHost();
- this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
+ this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort();
this.state = ContainerState.RUNNING;
if (LOG.isDebugEnabled()) {
@@ -102,8 +102,7 @@ public class TajoContainerProxy extends ContainerProxy {
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
.setExecutionBlockId(executionBlockId.getProto())
- .setQueryMasterHost(myAddr.getHostName())
- .setQueryMasterPort(myAddr.getPort())
+ .setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto())
.setNodeId(container.getNodeId().toString())
.setContainerId(container.getId().toString())
.setQueryOutputPath(context.getStagingDir().toString())
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7d80a88..e69393a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -503,9 +503,9 @@ public class TajoMasterClientService extends AbstractService {
context.getSessionManager().touch(request.getSessionId().getId());
GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
- Map<String, Worker> workers = context.getResourceManager().getWorkers();
+ Map<Integer, Worker> workers = context.getResourceManager().getWorkers();
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
Collections.sort(wokerKeys);
WorkerResourceInfo.Builder workerBuilder
@@ -513,7 +513,8 @@ public class TajoMasterClientService extends AbstractService {
for(Worker worker: workers.values()) {
WorkerResource workerResource = worker.getResource();
- workerBuilder.setAllocatedHost(worker.getHostName());
+
+ workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto());
workerBuilder.setDiskSlots(workerResource.getDiskSlots());
workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
workerBuilder.setMemoryMB(workerResource.getMemoryMB());
@@ -524,11 +525,6 @@ public class TajoMasterClientService extends AbstractService {
workerBuilder.setWorkerStatus(worker.getState().toString());
workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
- workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
- workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
- workerBuilder.setClientPort(worker.getClientPort());
- workerBuilder.setPullServerPort(worker.getPullServerPort());
- workerBuilder.setHttpPort(worker.getHttpPort());
workerBuilder.setMaxHeap(workerResource.getMaxHeap());
workerBuilder.setFreeHeap(workerResource.getFreeHeap());
workerBuilder.setTotalHeap(workerResource.getTotalHeap());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 5e9f729..ddf24d3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
@@ -97,7 +98,7 @@ public class TajoMasterService extends AbstractService {
RpcController controller,
TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort());
+ LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
}
TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
@@ -156,13 +157,9 @@ public class TajoMasterService extends AbstractService {
TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
TajoMasterProtocol.WorkerResourceProto.newBuilder();
- workerResource.setHost(worker.getHostName());
- workerResource.setPeerRpcPort(worker.getPeerRpcPort());
- workerResource.setInfoPort(worker.getHttpPort());
- workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+ workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
workerResource.setMemoryMB(resource.getMemoryMB());
workerResource.setDiskSlots(resource.getDiskSlots());
- workerResource.setQueryMasterPort(worker.getQueryMasterPort());
builder.addWorkerResources(workerResource);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
new file mode 100644
index 0000000..78d4978
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.cluster;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.TajoProtos.WorkerConnectionInfoProto;
+
+public class WorkerConnectionInfo implements ProtoObject<WorkerConnectionInfoProto>, Comparable<WorkerConnectionInfo> {
+
+ /**
+ * unique worker id
+ */
+ private int id;
+ /**
+ * Hostname
+ */
+ private String host;
+ /**
+ * Peer rpc port
+ */
+ private int peerRpcPort;
+ /**
+ * pull server port
+ */
+ private int pullServerPort;
+ /**
+ * QueryMaster rpc port
+ */
+ private int queryMasterPort;
+ /**
+ * the port of client rpc which provides an client API
+ */
+ private int clientPort;
+ /**
+ * http info port
+ */
+ private int httpInfoPort;
+
+ public WorkerConnectionInfo() {
+ }
+
+ public WorkerConnectionInfo(WorkerConnectionInfoProto proto) {
+ this();
+ this.id = proto.getId();
+ this.host = proto.getHost();
+ this.peerRpcPort = proto.getPeerRpcPort();
+ this.pullServerPort = proto.getPullServerPort();
+ this.clientPort = proto.getClientPort();
+ this.httpInfoPort = proto.getHttpInfoPort();
+ this.queryMasterPort = proto.getQueryMasterPort();
+ }
+
+ public WorkerConnectionInfo(String host, int peerRpcPort, int pullServerPort, int clientPort,
+ int queryMasterPort, int httpInfoPort) {
+ this();
+ this.host = host;
+ this.peerRpcPort = peerRpcPort;
+ this.pullServerPort = pullServerPort;
+ this.clientPort = clientPort;
+ this.queryMasterPort = queryMasterPort;
+ this.httpInfoPort = httpInfoPort;
+ this.id = hashCode();
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPeerRpcPort() {
+ return peerRpcPort;
+ }
+
+ public int getPullServerPort() {
+ return pullServerPort;
+ }
+
+ public int getQueryMasterPort() {
+ return queryMasterPort;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public int getHttpInfoPort() {
+ return httpInfoPort;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getHostAndPeerRpcPort() {
+ return this.getHost() + ":" + this.getPeerRpcPort();
+ }
+
+ @Override
+ public WorkerConnectionInfoProto getProto() {
+ WorkerConnectionInfoProto.Builder builder = WorkerConnectionInfoProto.newBuilder();
+ builder.setId(id)
+ .setHost(host)
+ .setPeerRpcPort(peerRpcPort)
+ .setPullServerPort(pullServerPort)
+ .setClientPort(clientPort)
+ .setHttpInfoPort(httpInfoPort)
+ .setQueryMasterPort(queryMasterPort);
+ return builder.build();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 493217;
+ int result = 8501;
+ result = prime * result + this.getHost().hashCode();
+ result = prime * result + this.getPeerRpcPort();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ WorkerConnectionInfo other = (WorkerConnectionInfo) obj;
+ if (!this.getHost().equals(other.getHost()))
+ return false;
+ if (this.getPeerRpcPort() != other.getPeerRpcPort())
+ return false;
+ return true;
+ }
+
+ @Override
+ public int compareTo(WorkerConnectionInfo other) {
+ int hostCompare = this.getHost().compareTo(other.getHost());
+ if (hostCompare == 0) {
+ if (this.getPeerRpcPort() > other.getPeerRpcPort()) {
+ return 1;
+ } else if (this.getPeerRpcPort() < other.getPeerRpcPort()) {
+ return -1;
+ }
+ return 0;
+ }
+ return hostCompare;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id:").append(id).append(", ")
+ .append("host:").append(host).append(", ")
+ .append("PeerRpcPort:").append(peerRpcPort).append(", ")
+ .append("PullServerPort:").append(pullServerPort).append(", ")
+ .append("ClientPort:").append(clientPort).append(", ")
+ .append("QueryMasterPort:").append(queryMasterPort).append(", ")
+ .append("HttpInfoPort:").append(httpInfoPort);
+ return builder.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 4934633..e0928c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -20,29 +20,24 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
private final ContainerId cId;
- private final String hostName;
- private final int pullServerPort;
+ private final WorkerConnectionInfo workerConnectionInfo;
public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
- String hostname, int pullServerPort) {
+ WorkerConnectionInfo connectionInfo) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.cId = cId;
- this.hostName = hostname;
- this.pullServerPort = pullServerPort;
+ this.workerConnectionInfo = connectionInfo;
}
public ContainerId getContainerId() {
return cId;
}
- public String getHostName() {
- return hostName;
- }
-
- public int getPullServerPort() {
- return pullServerPort;
+ public WorkerConnectionInfo getWorkerConnectionInfo(){
+ return workerConnectionInfo;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 9be7cab..2197c33 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -31,24 +31,31 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
TASK_REQ
}
- private final ContainerId workerId;
+ private final int workerId;
+ private final ContainerId containerId;
private final ExecutionBlockId executionBlockId;
private final RpcCallback<QueryUnitRequestProto> callback;
- public TaskRequestEvent(ContainerId workerId,
+ public TaskRequestEvent(int workerId,
+ ContainerId containerId,
ExecutionBlockId executionBlockId,
RpcCallback<QueryUnitRequestProto> callback) {
super(TaskRequestEventType.TASK_REQ);
this.workerId = workerId;
+ this.containerId = containerId;
this.executionBlockId = executionBlockId;
this.callback = callback;
}
- public ContainerId getContainerId() {
+ public int getWorkerId() {
return this.workerId;
}
+ public ContainerId getContainerId() {
+ return this.containerId;
+ }
+
public ExecutionBlockId getExecutionBlockId() {
return executionBlockId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 261200e..877a20a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -172,9 +172,9 @@ public class QueryInProgress extends CompositeService {
return false;
}
- queryInfo.setQueryMaster(resource.getWorkerHost());
- queryInfo.setQueryMasterPort(resource.getQueryMasterPort());
- queryInfo.setQueryMasterclientPort(resource.getClientPort());
+ queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+ queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index acaefc9..e4f47cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -31,6 +31,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.session.Session;
import org.apache.tajo.scheduler.SimpleFifoScheduler;
@@ -241,11 +242,11 @@ public class QueryJobManager extends CompositeService {
private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
- if(queryHeartbeat.getTajoWorkerHost() != null) {
- queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost());
- queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
- queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort());
- }
+ WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+ queryInfo.setQueryMaster(connectionInfo.getHost());
+ queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+ queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
queryInfo.setQueryState(queryHeartbeat.getState());
queryInfo.setProgress(queryHeartbeat.getQueryProgress());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b54675c..b8c39e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -192,9 +192,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
try {
- if (worker.getPeerRpcPort() == 0) continue;
-
- rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
@@ -214,9 +213,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
try {
- if (worker.getPeerRpcPort() == 0) continue;
-
- rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+ TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+ rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
@@ -299,9 +297,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
- .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
- .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
- .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+ .setConnectionInfo(workerContext.getConnectionInfo().getProto())
.setState(state)
.setQueryId(queryId.getProto());
@@ -460,9 +456,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
- builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName());
- builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort());
- builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort());
+ builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
builder.setState(queryMasterTask.getState());
builder.setQueryId(queryMasterTask.getQueryId().getProto());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 862dfef..f953995 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -133,7 +133,7 @@ public class QueryMasterManagerService extends CompositeService
ContainerId cid =
queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
LOG.debug("getTask:" + cid + ", ebId:" + ebId);
- queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+ queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index f41fd0e..03c6d30 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -522,8 +522,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.successfulAttempt = attemptEvent.getTaskAttemptId();
- task.succeededHost = attempt.getHost();
- task.succeededPullServerPort = attempt.getPullServerPort();
+ task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+ task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
task.finishTask();
task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
@@ -537,7 +537,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.launchTime = System.currentTimeMillis();
- task.succeededHost = attempt.getHost();
+ task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
}
}
@@ -632,9 +632,12 @@ public class QueryUnit implements EventHandler<TaskEvent> {
public static class PullHost implements Cloneable {
String host;
int port;
+ int hashCode;
+
public PullHost(String pullServerAddr, int pullServerPort){
this.host = pullServerAddr;
this.port = pullServerPort;
+ this.hashCode = Objects.hashCode(host, port);
}
public String getHost() {
return host;
@@ -650,7 +653,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Override
public int hashCode() {
- return Objects.hashCode(host, port);
+ return hashCode;
}
@Override
@@ -668,6 +671,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
PullHost newPullHost = (PullHost) super.clone();
newPullHost.host = host;
newPullHost.port = port;
+ newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
return newPullHost;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index a4fa12f..db6f130 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -55,8 +56,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
final EventHandler eventHandler;
private ContainerId containerId;
- private String hostName;
- private int port;
+ private WorkerConnectionInfo workerConnectionInfo;
private int expire;
private final Lock readLock;
@@ -210,30 +210,14 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
return this.queryUnit;
}
- public String getHost() {
- return this.hostName;
- }
-
- public int getPort() {
- return this.port;
+ public WorkerConnectionInfo getWorkerConnectionInfo() {
+ return this.workerConnectionInfo;
}
public void setContainerId(ContainerId containerId) {
this.containerId = containerId;
}
- public void setHost(String host) {
- this.hostName = host;
- }
-
- public void setPullServerPort(int port) {
- this.port = port;
- }
-
- public int getPullServerPort() {
- return port;
- }
-
public synchronized void setExpireTime(int expire) {
this.expire = expire;
}
@@ -277,7 +261,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
if (report.getShuffleFileOutputsCount() > 0) {
this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
- PullHost host = new PullHost(getHost(), getPullServerPort());
+ PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
getId().getId(), p.getPartId(), host, p.getVolume());
@@ -325,8 +309,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskAttemptEvent event) {
TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
taskAttempt.containerId = castEvent.getContainerId();
- taskAttempt.setHost(castEvent.getHostName());
- taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+ taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
taskAttempt.eventHandler.handle(
new TaskTAttemptEvent(taskAttempt.getId(),
TaskEventType.T_ATTEMPT_LAUNCHED));
@@ -415,7 +398,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
- LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+ LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+ + " >> " + errorEvent.errorMessage());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index 2229f04..5d07ff2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -37,16 +37,16 @@ public class TajoRMContext {
final Dispatcher rmDispatcher;
/** map between workerIds and running workers */
- private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+ private final ConcurrentMap<Integer, Worker> workers = Maps.newConcurrentMap();
/** map between workerIds and inactive workers */
- private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+ private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap();
/** map between queryIds and query master ContainerId */
private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
- private final Set<String> liveQueryMasterWorkerResources =
- Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ private final Set<Integer> liveQueryMasterWorkerResources =
+ Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
private final Set<QueryId> stoppedQueryIds =
Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
@@ -62,14 +62,14 @@ public class TajoRMContext {
/**
* @return The Map for active workers
*/
- public ConcurrentMap<String, Worker> getWorkers() {
+ public ConcurrentMap<Integer, Worker> getWorkers() {
return workers;
}
/**
* @return The Map for inactive workers
*/
- public ConcurrentMap<String, Worker> getInactiveWorkers() {
+ public ConcurrentMap<Integer, Worker> getInactiveWorkers() {
return inactiveWorkers;
}
@@ -81,7 +81,7 @@ public class TajoRMContext {
return qmContainerMap;
}
- public Set<String> getQueryMasterWorker() {
+ public Set<Integer> getQueryMasterWorker() {
return liveQueryMasterWorkerResources;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 4bd7adb..831ce43 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
@@ -111,9 +112,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
/** The response builder */
private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
- private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+ private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
return new WorkerStatusEvent(
- workerKey,
+ workerId,
heartbeat.getServerStatus().getRunningTaskNum(),
heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
@@ -128,7 +129,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
try {
// get a workerId from the heartbeat
- String workerId = createWorkerId(heartbeat);
+ int workerId = heartbeat.getConnectionInfo().getId();
if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
@@ -145,7 +146,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
// create new worker instance
Worker newWorker = createWorkerResource(heartbeat);
- String newWorkerId = newWorker.getWorkerId();
+ int newWorkerId = newWorker.getWorkerId();
// add the new worker to the list of active workers
rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
@@ -178,10 +179,6 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
}
- private static final String createWorkerId(NodeHeartbeat heartbeat) {
- return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
- }
-
private Worker createWorkerResource(NodeHeartbeat request) {
boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
@@ -204,14 +201,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
workerResource.setCpuCoreSlots(4);
}
- Worker worker = new Worker(rmContext, workerResource);
- worker.setHostName(request.getTajoWorkerHost());
- worker.setHttpPort(request.getTajoWorkerHttpPort());
- worker.setPeerRpcPort(request.getPeerRpcPort());
- worker.setQueryMasterPort(request.getTajoQueryMasterPort());
- worker.setClientPort(request.getTajoWorkerClientPort());
- worker.setPullServerPort(request.getTajoWorkerPullServerPort());
- return worker;
+ return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
}
public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
@@ -224,7 +214,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
int totalAvailableMemoryMB = 0;
synchronized(rmContext) {
- for(String eachWorker: rmContext.getWorkers().keySet()) {
+ for(int eachWorker: rmContext.getWorkers().keySet()) {
Worker worker = rmContext.getWorkers().get(eachWorker);
WorkerResource resource = worker.getResource();
if(worker != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 3915225..0e3ccad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -134,7 +134,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
@Override
public void handle(WorkerEvent event) {
- String workerId = event.getWorkerId();
+ int workerId = event.getWorkerId();
Worker node = this.rmContext.getWorkers().get(workerId);
if (node != null) {
try {
@@ -147,16 +147,16 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
@Override
- public Map<String, Worker> getWorkers() {
+ public Map<Integer, Worker> getWorkers() {
return ImmutableMap.copyOf(rmContext.getWorkers());
}
@Override
- public Map<String, Worker> getInactiveWorkers() {
+ public Map<Integer, Worker> getInactiveWorkers() {
return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
}
- public Collection<String> getQueryMasters() {
+ public Collection<Integer> getQueryMasters() {
return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
}
@@ -303,8 +303,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
new ArrayList<WorkerAllocatedResource>();
for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
- NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
- allocatedResource.worker.getPeerRpcPort());
+ NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
+ allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
@@ -315,12 +315,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
ContainerIdProto containerIdProto = containerId.getProto();
allocatedResources.add(WorkerAllocatedResource.newBuilder()
.setContainerId(containerIdProto)
- .setNodeId(nodeId.toString())
- .setWorkerHost(allocatedResource.worker.getHostName())
- .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
- .setClientPort(allocatedResource.worker.getClientPort())
- .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
- .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+ .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
.setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
.setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
.build());
@@ -339,7 +334,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if(LOG.isDebugEnabled()) {
LOG.debug("=========================================");
LOG.debug("Available Workers");
- for(String liveWorker: rmContext.getWorkers().keySet()) {
+ for(int liveWorker: rmContext.getWorkers().keySet()) {
LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
}
LOG.debug("=========================================");
@@ -367,7 +362,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
- List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
@@ -377,7 +372,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
resourceRequest.request.getMinDiskSlotPerContainer());
int liveWorkerSize = randomWorkers.size();
- Set<String> insufficientWorkers = new HashSet<String>();
+ Set<Integer> insufficientWorkers = new HashSet<Integer>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
@@ -394,7 +389,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
- for(String eachWorker: randomWorkers) {
+ for(int eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
@@ -436,7 +431,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
} else {
synchronized(rmContext) {
- List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+ List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
Collections.shuffle(randomWorkers);
int numContainers = resourceRequest.request.getNumContainers();
@@ -446,7 +441,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
resourceRequest.request.getMinMemoryMBPerContainer());
int liveWorkerSize = randomWorkers.size();
- Set<String> insufficientWorkers = new HashSet<String>();
+ Set<Integer> insufficientWorkers = new HashSet<Integer>();
boolean stop = false;
boolean checkMax = true;
while(!stop) {
@@ -463,7 +458,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
- for(String eachWorker: randomWorkers) {
+ for(int eachWorker: randomWorkers) {
if(allocatedResources >= numContainers) {
stop = true;
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index de6ee9e..edded4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import java.util.EnumSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,24 +40,15 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
/** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
private final TajoRMContext rmContext;
- /** Hostname */
- private String hostName;
- /** QueryMaster rpc port */
- private int qmRpcPort;
- /** Peer rpc port */
- private int peerRpcPort;
- /** http info port */
- private int httpInfoPort;
- /** the port of QueryMaster client rpc which provides an client API */
- private int qmClientPort;
- /** pull server port */
- private int pullServerPort;
/** last heartbeat time */
private long lastHeartbeatTime;
/** Resource capability */
private WorkerResource resource;
+ /** Worker connection information */
+ private WorkerConnectionInfo connectionInfo;
+
private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
@@ -99,9 +91,10 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
stateMachineFactory.make(this, WorkerState.NEW);
- public Worker(TajoRMContext rmContext, WorkerResource resource) {
+ public Worker(TajoRMContext rmContext, WorkerResource resource, WorkerConnectionInfo connectionInfo) {
this.rmContext = rmContext;
+ this.connectionInfo = connectionInfo;
this.lastHeartbeatTime = System.currentTimeMillis();
this.resource = resource;
@@ -110,56 +103,12 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
this.writeLock = lock.writeLock();
}
- public String getWorkerId() {
- return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public void setHostName(String allocatedHost) {
- this.hostName = allocatedHost;
- }
-
- public int getPeerRpcPort() {
- return peerRpcPort;
- }
-
- public void setPeerRpcPort(int peerRpcPort) {
- this.peerRpcPort = peerRpcPort;
- }
-
- public int getQueryMasterPort() {
- return qmRpcPort;
- }
-
- public void setQueryMasterPort(int queryMasterPort) {
- this.qmRpcPort = queryMasterPort;
- }
-
- public int getClientPort() {
- return qmClientPort;
- }
-
- public void setClientPort(int clientPort) {
- this.qmClientPort = clientPort;
- }
-
- public int getPullServerPort() {
- return pullServerPort;
- }
-
- public void setPullServerPort(int pullServerPort) {
- this.pullServerPort = pullServerPort;
- }
-
- public int getHttpPort() {
- return httpInfoPort;
+ public int getWorkerId() {
+ return connectionInfo.getId();
}
- public void setHttpPort(int port) {
- this.httpInfoPort = port;
+ public WorkerConnectionInfo getConnectionInfo() {
+ return connectionInfo;
}
public void setLastHeartbeatTime(long lastheartbeatReportTime) {
@@ -209,7 +158,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
if(o == null) {
return 1;
}
- return getWorkerId().compareTo(o.getWorkerId());
+ return connectionInfo.compareTo(o.connectionInfo);
}
public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
index 389c3be..c208990 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
* WorkerEvent describes all kinds of events which sent to {@link Worker}.
*/
public class WorkerEvent extends AbstractEvent<WorkerEventType> {
- private final String workerId;
+ private final int workerId;
- public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+ public WorkerEvent(int workerId, WorkerEventType workerEventType) {
super(workerEventType);
this.workerId = workerId;
}
- public String getWorkerId() {
+ public int getWorkerId() {
return workerId;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
index e3524d6..2751886 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -30,7 +30,7 @@ import org.apache.tajo.conf.TajoConf;
* It periodically checks the latest heartbeat time of {@link Worker}.
* If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
*/
-public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<Integer> {
private EventHandler dispatcher;
@@ -50,7 +50,7 @@ public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
}
@Override
- protected void expire(String id) {
+ protected void expire(Integer id) {
dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
index 46f286d..3828b6a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -24,7 +24,7 @@ package org.apache.tajo.master.rm;
*/
public class WorkerReconnectEvent extends WorkerEvent {
private final Worker worker;
- public WorkerReconnectEvent(String workerId, Worker worker) {
+ public WorkerReconnectEvent(int workerId, Worker worker) {
super(workerId, WorkerEventType.RECONNECTED);
this.worker = worker;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 54fe11c..8e8ac51 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -86,13 +86,13 @@ public interface WorkerResourceManager extends Service {
*
* @return a Map instance containing active workers
*/
- public Map<String, Worker> getWorkers();
+ public Map<Integer, Worker> getWorkers();
/**
*
* @return a Map instance containing inactive workers
*/
- public Map<String, Worker> getInactiveWorkers();
+ public Map<Integer, Worker> getInactiveWorkers();
public void stop();
@@ -106,5 +106,5 @@ public interface WorkerResourceManager extends Service {
*
* @return WorkerIds on which QueryMasters are running
*/
- Collection<String> getQueryMasters();
+ Collection<Integer> getQueryMasters();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
index 8c3d7c1..f1ab401 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -28,7 +28,7 @@ public class WorkerStatusEvent extends WorkerEvent {
private final long freeHeap;
private final long totalHeap;
- public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+ public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
super(workerId, WorkerEventType.STATE_UPDATE);
this.runningTaskNum = runningTaskNum;
this.maxHeap = maxHeap;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index 55aa8c4..ca71c53 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -22,10 +22,25 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+ /**
+ * A key is worker id, and a value is a worker connection information.
+ */
+ protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = Maps.newConcurrentMap();
+
+ public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) {
+ return workerInfoMap.get(workerId);
+ }
+
+ public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) {
+ workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
+ }
+
private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
public AbstractResourceAllocator() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index d4b9861..1ec8a88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -33,12 +33,14 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -79,6 +81,7 @@ public class ExecutionBlockContext {
private TajoQueryEngine queryEngine;
private RpcConnectionPool connPool;
private InetSocketAddress qmMasterAddr;
+ private WorkerConnectionInfo queryMaster;
private TajoConf systemConf;
// for the doAs block
private UserGroupInformation taskOwner;
@@ -92,12 +95,12 @@ public class ExecutionBlockContext {
private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
- public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+ public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster)
throws Throwable {
this.manager = manager;
this.executionBlockId = event.getExecutionBlockId();
this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
- this.qmMasterAddr = queryMaster;
+ this.queryMaster = queryMaster;
this.systemConf = manager.getTajoConf();
this.reporter = new Reporter();
this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
@@ -118,6 +121,7 @@ public class ExecutionBlockContext {
LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
+ this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), queryMaster.getQueryMasterPort());
LOG.info("QueryMaster Address:" + qmMasterAddr);
UserGroupInformation.setConfiguration(systemConf);
@@ -329,8 +333,8 @@ public class ExecutionBlockContext {
intermediateBuilder.clear();
intermediateBuilder.setEbId(ebId.getProto())
- .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
- getWorkerContext().getPullServerPort())
+ .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
+ getWorkerContext().getConnectionInfo().getPullServerPort())
.setTaskId(-1)
.setAttemptId(-1)
.setPartId(eachShuffle.getPartId())
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2cc8f0c..2220089 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -34,6 +34,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
@@ -129,6 +130,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
LOG.warn(e.getMessage());
}
}
+
+ workerInfoMap.clear();
super.stop();
}
@@ -325,8 +328,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
List<Container> containers = new ArrayList<Container>();
for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
- NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
- eachAllocatedResource.getPeerRpcPort());
+ NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
+ eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
@@ -343,14 +346,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
- Worker worker = new Worker(null, workerResource);
- worker.setHostName(nodeId.getHost());
- worker.setPeerRpcPort(nodeId.getPort());
- worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
- worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
-
+ Worker worker = new Worker(null, workerResource,
+ new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
container.setWorkerResource(worker);
-
+ addWorkerConnectionInfo(worker.getConnectionInfo());
containers.add(container);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index a8d661b..280fc2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogClient;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.common.exception.NotImplementedException;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.master.ha.TajoMasterInfo;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.querymaster.QueryMasterManagerService;
@@ -94,12 +96,13 @@ public class TajoWorker extends CompositeService {
private TajoPullServerService pullService;
- private int pullServerPort;
-
+ @Deprecated
private boolean yarnContainerMode;
+ @Deprecated
private boolean queryMasterMode;
+ @Deprecated
private boolean taskRunnerMode;
private WorkerHeartbeatService workerHeartbeatThread;
@@ -110,7 +113,7 @@ public class TajoWorker extends CompositeService {
private TajoMasterProtocol.ClusterResourceSummary clusterResource;
- private int httpPort;
+ private WorkerConnectionInfo connectionInfo;
private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
@@ -199,71 +202,52 @@ public class TajoWorker extends CompositeService {
this.dispatcher = new AsyncDispatcher();
addIfService(dispatcher);
+ tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+ addIfService(tajoWorkerManagerService);
+
// querymaster worker
tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
- addService(tajoWorkerClientService);
+ addIfService(tajoWorkerClientService);
queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
- addService(queryMasterManagerService);
+ addIfService(queryMasterManagerService);
// taskrunner worker
taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
addService(taskRunnerManager);
- tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
- addService(tajoWorkerManagerService);
-
- if(!yarnContainerMode) {
- if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
- pullService = new TajoPullServerService();
- addService(pullService);
- }
+ workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+ addIfService(workerHeartbeatThread);
- if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
- try {
- httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
- if(queryMasterMode && !taskRunnerMode) {
- //If QueryMaster and TaskRunner run on single host, http port conflicts
- httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
- }
- webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
- true, null, systemConf, null);
- webServer.start();
- httpPort = webServer.getPort();
- LOG.info("Worker info server started:" + httpPort);
-
- deletionService = new DeletionService(getMountPath().size(), 0);
- if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
- getWorkerContext().cleanupTemporalDirectories();
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
+ int httpPort = 0;
+ if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
+ pullService = new TajoPullServerService();
+ addIfService(pullService);
}
- LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
- ", qmRpcPort=" + qmManagerPort +
- ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
- ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ httpPort = initWebServer();
+ }
super.serviceInit(conf);
- tajoMasterInfo = new TajoMasterInfo();
- if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
- tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+ int pullServerPort;
+ if(pullService != null){
+ pullServerPort = pullService.getPort();
} else {
- tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
- tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
- .RESOURCE_TRACKER_RPC_ADDRESS)));
+ pullServerPort = getStandAlonePullServerPort();
}
- connectToCatalog();
- workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
- workerHeartbeatThread.init(conf);
- addIfService(workerHeartbeatThread);
+ this.connectionInfo = new WorkerConnectionInfo(
+ tajoWorkerManagerService.getBindAddr().getHostName(),
+ tajoWorkerManagerService.getBindAddr().getPort(),
+ pullServerPort,
+ tajoWorkerClientService.getBindAddr().getPort(),
+ queryMasterManagerService.getBindAddr().getPort(),
+ httpPort);
+
+ LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode
+ + " connection :" + connectionInfo.toString());
try {
hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf);
@@ -300,14 +284,57 @@ public class TajoWorker extends CompositeService {
});
}
+ private int initWebServer() {
+ int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+ try {
+ if (queryMasterMode && !taskRunnerMode) {
+ //If QueryMaster and TaskRunner run on single host, http port conflicts
+ httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+ }
+ webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort,
+ true, null, systemConf, null);
+ webServer.start();
+ httpPort = webServer.getPort();
+ LOG.info("Worker info server started:" + httpPort);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return httpPort;
+ }
+
+ private void initCleanupService() throws IOException {
+ deletionService = new DeletionService(getMountPath().size(), 0);
+ if (systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)) {
+ getWorkerContext().cleanupTemporalDirectories();
+ }
+ }
+
public WorkerContext getWorkerContext() {
return workerContext;
}
@Override
public void serviceStart() throws Exception {
+
+ tajoMasterInfo = new TajoMasterInfo();
+ if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+ } else {
+ tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
+ tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+ .RESOURCE_TRACKER_RPC_ADDRESS)));
+ }
+ connectToCatalog();
+
+ if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ initCleanupService();
+ }
+
initWorkerMetrics();
super.serviceStart();
+ LOG.info("Tajo Worker is started");
}
@Override
@@ -319,7 +346,7 @@ public class TajoWorker extends CompositeService {
if(webServer != null) {
try {
webServer.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
@@ -336,7 +363,7 @@ public class TajoWorker extends CompositeService {
if(webServer != null && webServer.isAlive()) {
try {
webServer.stop();
- } catch (Exception e) {
+ } catch (Throwable e) {
}
}
@@ -381,15 +408,19 @@ public class TajoWorker extends CompositeService {
return catalogClient;
}
- public int getHttpPort() {
- return httpPort;
+ public TajoPullServerService getPullService() {
+ return pullService;
+ }
+
+ public WorkerConnectionInfo getConnectionInfo() {
+ return connectionInfo;
}
public String getWorkerName() {
if (queryMasterMode) {
return getQueryMasterManagerService().getHostAndPort();
} else {
- return getTajoWorkerManagerService().getHostAndPort();
+ return connectionInfo.getHostAndPeerRpcPort();
}
}
@@ -444,6 +475,7 @@ public class TajoWorker extends CompositeService {
}
}
+ @Deprecated
public boolean isYarnContainerMode() {
return yarnContainerMode;
}
@@ -503,45 +535,20 @@ public class TajoWorker extends CompositeService {
public HashShuffleAppenderManager getHashShuffleAppenderManager() {
return hashShuffleAppenderManager;
}
-
- public int getPullServerPort() {
- if (pullService != null) {
- long startTime = System.currentTimeMillis();
- while (true) {
- int pullServerPort = pullService.getPort();
- if (pullServerPort > 0) {
- return pullServerPort;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- if (System.currentTimeMillis() - startTime > 30 * 1000) {
- LOG.fatal("TajoWorker stopped cause can't get PullServer port.");
- System.exit(-1);
- }
- }
- } else {
- if (pullServerPort != 0) {
- return pullServerPort;
- } else {
- loadPullServerPort();
- return pullServerPort;
- }
- }
- }
}
- private void loadPullServerPort() {
- // get pull server port
+ private int getStandAlonePullServerPort() {
long startTime = System.currentTimeMillis();
+ int pullServerPort;
+
+ //wait for pull server bring up
while (true) {
pullServerPort = TajoPullServerService.readPullServerPort();
if (pullServerPort > 0) {
break;
}
try {
- Thread.sleep(1000);
+ Thread.sleep(500);
} catch (InterruptedException e) {
}
if (System.currentTimeMillis() - startTime > 30 * 1000) {
@@ -549,6 +556,7 @@ public class TajoWorker extends CompositeService {
System.exit(-1);
}
}
+ return pullServerPort;
}
public void stopWorkerForce() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index d25013c..fb4f861 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -54,7 +54,7 @@ public class TajoWorkerClientService extends AbstractService {
private BlockingRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private String addr;
+
private int port;
private TajoConf conf;
private TajoWorker.WorkerContext workerContext;
@@ -88,14 +88,12 @@ public class TajoWorkerClientService extends AbstractService {
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
- LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+ LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + bindAddr);
super.init(conf);
}