You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:49 UTC
[03/10] TAJO-1016: Refactor worker rpc information. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 472ce1b..48f4f66 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
@@ -46,7 +47,6 @@ public class TajoWorkerManagerService extends CompositeService
private AsyncRpcServer rpcServer;
private InetSocketAddress bindAddr;
- private String addr;
private int port;
private TajoWorker.WorkerContext workerContext;
@@ -74,14 +74,12 @@ public class TajoWorkerManagerService extends CompositeService
this.rpcServer.start();
this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
- this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
this.port = bindAddr.getPort();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Get the master address
- LOG.info("TajoWorkerManagerService is bind to " + addr);
+ LOG.info("TajoWorkerManagerService is bind to " + bindAddr);
tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
super.init(tajoConf);
}
@@ -104,10 +102,6 @@ public class TajoWorkerManagerService extends CompositeService
return bindAddr;
}
- public String getHostAndPort() {
- return bindAddr.getHostName() + ":" + bindAddr.getPort();
- }
-
@Override
public void ping(RpcController controller,
TajoIdProtos.QueryUnitAttemptIdProto attemptId,
@@ -122,24 +116,11 @@ public class TajoWorkerManagerService extends CompositeService
workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
try {
-
- String[] params = new String[7];
- params[0] = "standby"; //mode(never used)
- params[1] = request.getExecutionBlockId().toString();
- // NodeId has a form of hostname:port.
- params[2] = request.getNodeId();
- params[3] = request.getContainerId();
-
- // QueryMaster's address
- params[4] = request.getQueryMasterHost();
- params[5] = String.valueOf(request.getQueryMasterPort());
- params[6] = request.getQueryOutputPath();
-
- ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
- params
- , executionBlockId,
- new QueryContext(workerContext.getConf(), request.getQueryContext()),
+ new WorkerConnectionInfo(request.getQueryMaster())
+ , new ExecutionBlockId(request.getExecutionBlockId())
+ , request.getContainerId()
+ , new QueryContext(workerContext.getConf(), request.getQueryContext()),
request.getPlanJson()
));
done.run(TajoWorker.TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index c9c83d1..66e0f87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -307,7 +307,7 @@ public class Task {
public TaskStatusProto getReport() {
TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
- builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString());
+ builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
builder.setId(context.getTaskId().getProto())
.setProgress(context.getProgress())
.setState(context.getState());
@@ -323,6 +323,7 @@ public class Task {
public boolean isRunning(){
return context.getState() == TaskAttemptState.TA_RUNNING;
}
+
public boolean isProgressChanged() {
return context.isProgressChanged();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index ea8ed82..e4771a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
@@ -53,7 +52,6 @@ public class TaskRunner extends AbstractService {
private volatile boolean stopped = false;
private Path baseDirPath;
- private NodeId nodeId;
private ContainerId containerId;
// for Fetcher
@@ -69,7 +67,7 @@ public class TaskRunner extends AbstractService {
private TaskRunnerHistory history;
- public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) {
+ public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) {
super(TaskRunner.class.getName());
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@@ -78,16 +76,7 @@ public class TaskRunner extends AbstractService {
this.fetchLauncher = Executors.newFixedThreadPool(
systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
try {
- // QueryBlockId from String
- // NodeId has a form of hostname:port.
- this.nodeId = ConverterUtils.toNodeId(args[2]);
- this.containerId = ConverterUtils.toContainerId(args[3]);
-
-
- // QueryMaster's address
- //String host = args[4];
- //int port = Integer.parseInt(args[5]);
-
+ this.containerId = ConverterUtils.toContainerId(containerId);
this.executionBlockContext = executionBlockContext;
this.history = executionBlockContext.createTaskRunnerHistory(this);
this.history.setState(getServiceState());
@@ -101,10 +90,6 @@ public class TaskRunner extends AbstractService {
return getId(getContext().getExecutionBlockId(), containerId);
}
- public NodeId getNodeId(){
- return nodeId;
- }
-
public ContainerId getContainerId(){
return containerId;
}
@@ -212,6 +197,7 @@ public class TaskRunner extends AbstractService {
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(getExecutionBlockId().getProto())
.setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
.build();
qmClientService.getTask(null, request, callFuture);
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index c3713d1..faadf58 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -37,7 +37,6 @@ import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -167,14 +166,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
if (event instanceof TaskRunnerStartEvent) {
TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
- String[] params = startEvent.getParams();
+
if(context == null){
try {
- // QueryMaster's address
- String host = params[4];
- int port = Integer.parseInt(params[5]);
-
- context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port));
+ context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster());
} catch (Throwable e) {
LOG.fatal(e.getMessage(), e);
throw new RuntimeException(e);
@@ -182,7 +177,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
executionBlockContextMap.put(event.getExecutionBlockId(), context);
}
- TaskRunner taskRunner = new TaskRunner(context, params);
+ TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId());
LOG.info("Start TaskRunner:" + taskRunner.getId());
taskRunnerMap.put(taskRunner.getId(), taskRunner);
taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 47f2261..6a90f74 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -19,6 +19,7 @@
package org.apache.tajo.worker;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,7 +38,6 @@ import org.apache.tajo.storage.v2.DiskUtil;
import org.apache.tajo.util.HAServiceUtil;
import java.io.File;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -68,30 +68,38 @@ public class WorkerHeartbeatService extends AbstractService {
}
@Override
- public void serviceInit(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
this.systemConf = (TajoConf) conf;
connectionPool = RpcConnectionPool.getPool(systemConf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
thread = new WorkerHeartbeatThread();
thread.start();
- super.init(conf);
+ super.serviceStart();
}
@Override
- public void serviceStop() {
- thread.stopped.set(true);
+ public void serviceStop() throws Exception {
+ if(thread.stopped.getAndSet(true)){
+ return;
+ }
+
synchronized (thread) {
thread.notifyAll();
}
- super.stop();
+
+ super.serviceStop();
}
class WorkerHeartbeatThread extends Thread {
private volatile AtomicBoolean stopped = new AtomicBoolean(false);
TajoMasterProtocol.ServerStatusProto.System systemInfo;
- List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
- new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
float workerDiskSlots;
int workerMemoryMB;
List<DiskDeviceInfo> diskDeviceInfos;
@@ -140,26 +148,6 @@ public class WorkerHeartbeatService extends AbstractService {
public void run() {
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
- int pullServerPort = 0;
-
- String hostName = null;
- int peerRpcPort = 0;
- int queryMasterPort = 0;
- int clientPort = 0;
-
- if(context.getTajoWorkerManagerService() != null) {
- hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
- peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
- }
- if(context.getQueryMasterManagerService() != null) {
- hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
- queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
- }
- if(context.getTajoWorkerClientService() != null) {
- clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
- }
-
- pullServerPort = context.getPullServerPort();
while(!stopped.get()) {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
@@ -185,12 +173,7 @@ public class WorkerHeartbeatService extends AbstractService {
.build();
NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
- .setTajoWorkerHost(hostName)
- .setTajoQueryMasterPort(queryMasterPort)
- .setPeerRpcPort(peerRpcPort)
- .setTajoWorkerClientPort(clientPort)
- .setTajoWorkerHttpPort(context.getHttpPort())
- .setTajoWorkerPullServerPort(pullServerPort)
+ .setConnectionInfo(context.getConnectionInfo().getProto())
.setServerStatus(serverStatus)
.build();
@@ -241,8 +224,10 @@ public class WorkerHeartbeatService extends AbstractService {
}
try {
- synchronized (WorkerHeartbeatThread.this){
- wait(10 * 1000);
+ if(!stopped.get()){
+ synchronized (thread){
+ thread.wait(10 * 1000);
+ }
}
} catch (InterruptedException e) {
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 8c9fa51..ff63754 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,25 +20,33 @@ package org.apache.tajo.worker.event;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
public class TaskRunnerStartEvent extends TaskRunnerEvent {
private final QueryContext queryContext;
- private final String[] params;
+ private final WorkerConnectionInfo queryMaster;
+ private final String containerId;
private final String plan;
- public TaskRunnerStartEvent(String[] params,
+ public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
ExecutionBlockId executionBlockId,
+ String containerId,
QueryContext context,
String plan) {
super(EventType.START, executionBlockId);
- this.params = params;
+ this.queryMaster = queryMaster;
+ this.containerId = containerId;
this.queryContext = context;
this.plan = plan;
}
- public String[] getParams(){
- return this.params;
+ public WorkerConnectionInfo getQueryMaster() {
+ return queryMaster;
+ }
+
+ public String getContainerId() {
+ return containerId;
}
public QueryContext getQueryContext() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index d46d09a..b117cac 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,16 +23,12 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "TajoMasterProtocol.proto";
+import "tajo_protos.proto";
message NodeHeartbeat {
- required string tajoWorkerHost = 1;
- required int32 peerRpcPort = 2;
- required int32 tajoQueryMasterPort = 3;
- optional ServerStatusProto serverStatus = 4;
- optional int32 tajoWorkerClientPort = 5;
- optional string statusMessage = 6;
- optional int32 tajoWorkerPullServerPort = 7;
- optional int32 tajoWorkerHttpPort = 8;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional ServerStatusProto serverStatus = 2;
+ optional string statusMessage = 3;
}
service TajoResourceTrackerProtocolService {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 8fccbaf..7283543 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -60,14 +60,12 @@ message ServerStatusProto {
}
message TajoHeartbeat {
- required string tajoWorkerHost = 1;
- required int32 tajoQueryMasterPort = 2;
- optional int32 tajoWorkerClientPort = 3;
- optional QueryIdProto queryId = 4;
- optional QueryState state = 5;
- optional string statusMessage = 6;
- optional float queryProgress = 7;
- optional int64 queryFinishTime = 8;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ optional QueryIdProto queryId = 2;
+ optional QueryState state = 3;
+ optional string statusMessage = 4;
+ optional float queryProgress = 5;
+ optional int64 queryFinishTime = 6;
}
message TajoHeartbeatResponse {
@@ -110,12 +108,9 @@ message WorkerResourceAllocationRequest {
}
message WorkerResourceProto {
- required string host = 1;
- required int32 peerRpcPort = 2;
- required int32 queryMasterPort = 3;
- required int32 infoPort = 4;
- required int32 memoryMB = 5 ;
- required float diskSlots = 6;
+ required WorkerConnectionInfoProto connectionInfo = 1;
+ required int32 memoryMB = 2 ;
+ required float diskSlots = 3;
}
message WorkerResourcesRequest {
@@ -129,15 +124,10 @@ message WorkerResourceReleaseRequest {
message WorkerAllocatedResource {
required hadoop.yarn.ContainerIdProto containerId = 1;
- required string nodeId = 2;
- required string workerHost = 3;
- required int32 peerRpcPort = 4;
- required int32 queryMasterPort = 5;
- required int32 clientPort = 6;
- required int32 workerPullServerPort = 7;
-
- required int32 allocatedMemoryMB = 8;
- required float allocatedDiskSlots = 9;
+ required WorkerConnectionInfoProto connectionInfo = 2;
+
+ required int32 allocatedMemoryMB = 3;
+ required float allocatedDiskSlots = 4;
}
message WorkerResourceAllocationResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dff2733..bde2459 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -168,8 +168,9 @@ message QueryExecutionRequestProto {
}
message GetTaskRequestProto {
- required hadoop.yarn.ContainerIdProto containerId = 1;
- required ExecutionBlockIdProto executionBlockId = 2;
+ required int32 workerId = 1;
+ required hadoop.yarn.ContainerIdProto containerId = 2;
+ required ExecutionBlockIdProto executionBlockId = 3;
}
enum ShuffleType {
@@ -202,14 +203,13 @@ message DataChannelProto {
message RunExecutionBlockRequestProto {
required ExecutionBlockIdProto executionBlockId = 1;
- required string queryMasterHost = 2;
- required int32 queryMasterPort = 3;
- required string nodeId = 4;
- required string containerId = 5;
- optional string queryOutputPath = 6;
-
- required KeyValueSetProto queryContext = 7;
- required string planJson = 8;
+ required WorkerConnectionInfoProto queryMaster = 2;
+ required string nodeId = 3;
+ required string containerId = 4;
+ optional string queryOutputPath = 5;
+
+ required KeyValueSetProto queryContext = 6;
+ required string planJson = 7;
}
message ExecutionBlockListProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 0317759..6fe21a2 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -20,20 +20,21 @@
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
<%@ page import="org.apache.tajo.master.ha.HAService" %>
<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
<%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.util.TUtil" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
Collections.sort(wokerKeys);
int runningQueryMasterTasks = 0;
@@ -175,17 +176,19 @@
<%
int no = 1;
for(Worker queryMaster: liveQueryMasters) {
- WorkerResource resource = queryMaster.getResource();
- String queryMasterHttp = "http://" + queryMaster.getHostName() + ":" + queryMaster.getHttpPort() + "/index.jsp";
+ WorkerResource resource = queryMaster.getResource();
+ WorkerConnectionInfo connectionInfo = queryMaster.getConnectionInfo();
+ String queryMasterHttp = "http://" + connectionInfo.getHost()
+ + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
%>
<tr>
- <td width='30' align='right'><%=no++%></td>
- <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></a></td>
- <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
- <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
- <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
- <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
- <td width='100' align='center'><%=queryMaster.getState()%></td>
+ <td width='30' align='right'><%=no++%></td>
+ <td><a href='<%=queryMasterHttp%>'><%=connectionInfo.getHost() + ":" + connectionInfo.getQueryMasterPort()%></a></td>
+ <td width='100' align='center'><%=connectionInfo.getClientPort()%></td>
+ <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
+ <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+ <td width='100' align='center'><%=queryMaster.getState()%></td>
</tr>
<%
} //end fo for
@@ -210,7 +213,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></td>
+ <td><%=queryMaster.getConnectionInfo().getHost() + ":" + queryMaster.getConnectionInfo().getQueryMasterPort()%></td>
</tr>
<%
} //end fo for
@@ -236,19 +239,20 @@
<%
int no = 1;
for(Worker worker: liveWorkers) {
- WorkerResource resource = worker.getResource();
- String workerHttp = "http://" + worker.getHostName() + ":" + worker.getHttpPort() + "/index.jsp";
+ WorkerResource resource = worker.getResource();
+ WorkerConnectionInfo connectionInfo = worker.getConnectionInfo();
+ String workerHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
%>
<tr>
- <td width='30' align='right'><%=no++%></td>
- <td><a href='<%=workerHttp%>'><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></a></td>
- <td width='80' align='center'><%=worker.getPullServerPort()%></td>
- <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
- <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
- <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
- <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
- <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
- <td width='100' align='center'><%=worker.getState()%></td>
+ <td width='30' align='right'><%=no++%></td>
+ <td><a href='<%=workerHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td>
+ <td width='80' align='center'><%=connectionInfo.getPullServerPort()%></td>
+ <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
+ <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
+ <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
+ <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+ <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+ <td width='100' align='center'><%=worker.getState()%></td>
</tr>
<%
} //end fo for
@@ -279,7 +283,7 @@
%>
<tr>
<td width='30' align='right'><%=no++%></td>
- <td><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></td>
+ <td><%=worker.getConnectionInfo().getHostAndPeerRpcPort()%></td>
</tr>
<%
} //end fo for
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 7ab1482..ce4d7dc 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -38,8 +38,8 @@
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
- Map<String, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
int numWorkers = 0;
int numLiveWorkers = 0;
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index e7b402f..9ddc90c 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -43,17 +43,17 @@
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
+ Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
Map<String, Integer> portMap = new HashMap<String, Integer>();
- Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters();
+ Collection<Integer> queryMasters = master.getContext().getResourceManager().getQueryMasters();
if (queryMasters == null || queryMasters.isEmpty()) {
queryMasters = master.getContext().getResourceManager().getWorkers().keySet();
}
- for(String eachQueryMasterKey: queryMasters) {
+ for(int eachQueryMasterKey: queryMasters) {
Worker queryMaster = workers.get(eachQueryMasterKey);
if(queryMaster != null) {
- portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort());
+ portMap.put(queryMaster.getConnectionInfo().getHost(), queryMaster.getConnectionInfo().getHttpInfoPort());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 1a325da..6e74b99 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -67,10 +67,10 @@
List<TajoMasterProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext()
.getQueryMasterManagerService().getQueryMaster().getAllWorker();
- Map<String, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<String, TajoMasterProtocol.WorkerResourceProto>();
+ Map<Integer, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<Integer, TajoMasterProtocol.WorkerResourceProto>();
if(allWorkers != null) {
for(TajoMasterProtocol.WorkerResourceProto eachWorker: allWorkers) {
- workerMap.put(eachWorker.getHost(), eachWorker);
+ workerMap.put(eachWorker.getConnectionInfo().getId(), eachWorker);
}
}
QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
@@ -201,12 +201,13 @@
String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost();
if(eachQueryUnit.getSucceededHost() != null) {
- TajoMasterProtocol.WorkerResourceProto worker = workerMap.get(eachQueryUnit.getSucceededHost());
+ TajoMasterProtocol.WorkerResourceProto worker =
+ workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId());
if(worker != null) {
QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt();
if(lastAttempt != null) {
QueryUnitAttemptId lastAttemptId = lastAttempt.getId();
- queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
+ queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index e20ab03..d84664f 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -22,7 +22,9 @@
<%@ page import="org.apache.commons.lang.StringUtils" %>
<%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
<%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
<%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
<%@ page import="org.apache.tajo.worker.*" %>
<%@ page import="java.text.SimpleDateFormat" %>
<%@ page import="java.util.List" %>
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index b5fb9d7..ae05047 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -65,9 +65,9 @@
<tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
<%
if (taskRunner != null) {
- TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+ ExecutionBlockContext context = taskRunner.getContext();
- for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) {
+ for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) {
QueryUnitAttemptId queryUnitId = entry.getKey();
TaskHistory eachTask = entry.getValue().createTaskHistory();
%>
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
new file mode 100644
index 0000000..03be125
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cluster;
+
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestWorkerConnectionInfo {
+
+ @Test
+ public void testWorkerId() {
+ WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+ WorkerConnectionInfo worker2 = new WorkerConnectionInfo("host2", 28091, 28092, 21000, 28093, 28080);
+
+ assertNotEquals(worker.getId(), worker2.getId());
+ assertEquals(worker.getId(), new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080).getId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 09d674a..0423894 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -24,6 +24,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.junit.Test;
@@ -94,11 +95,10 @@ public class TestTajoResourceManager {
.setRunningTaskNum(0)
.build();
+ WorkerConnectionInfo connectionInfo =
+ new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 28093, 28080);
NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
- .setTajoWorkerHost("host" + (i + 1))
- .setTajoQueryMasterPort(21000)
- .setTajoWorkerHttpPort(28080 + i)
- .setPeerRpcPort(12345)
+ .setConnectionInfo(connectionInfo.getProto())
.setServerStatus(serverStatus)
.build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 720f0ca..3fa67ae 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -215,11 +215,10 @@ public class TajoPullServerService extends AbstractService {
selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
localFS = new LocalFileSystem();
- super.init(conf);
- this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
+ conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
, TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
-
+ super.init(conf);
LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
} catch (Throwable t) {
LOG.error(t);
@@ -228,8 +227,7 @@ public class TajoPullServerService extends AbstractService {
// TODO change AbstractService to throw InterruptedException
@Override
- public synchronized void start() {
- Configuration conf = getConfig();
+ public synchronized void serviceInit(Configuration conf) throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap(selector);
try {
@@ -248,11 +246,11 @@ public class TajoPullServerService extends AbstractService {
conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
pipelineFact.PullServer.setPort(port);
LOG.info(getName() + " listening on port " + port);
- super.start();
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+
if (STANDALONE) {
File pullServerPortFile = getPullServerPortFile();
if (pullServerPortFile.exists()) {
@@ -272,6 +270,7 @@ public class TajoPullServerService extends AbstractService {
IOUtils.closeStream(out);
}
}
+ super.serviceInit(conf);
LOG.info("TajoPullServerService started: port=" + port);
}
@@ -487,9 +486,7 @@ public class TajoPullServerService extends AbstractService {
}
ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
- synchronized(processingStatusMap) {
- processingStatusMap.put(request.getUri().toString(), processingStatus);
- }
+ processingStatusMap.put(request.getUri().toString(), processingStatus);
// Parsing the URL into key-values
final Map<String, List<String>> params =
new QueryStringDecoder(request.getUri()).getParameters();