You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/21 20:20:49 UTC

[03/10] TAJO-1016: Refactor worker rpc information. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 472ce1b..48f4f66 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
@@ -46,7 +47,6 @@ public class TajoWorkerManagerService extends CompositeService
 
   private AsyncRpcServer rpcServer;
   private InetSocketAddress bindAddr;
-  private String addr;
   private int port;
 
   private TajoWorker.WorkerContext workerContext;
@@ -74,14 +74,12 @@ public class TajoWorkerManagerService extends CompositeService
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
       this.port = bindAddr.getPort();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
     // Get the master address
-    LOG.info("TajoWorkerManagerService is bind to " + addr);
+    LOG.info("TajoWorkerManagerService is bind to " + bindAddr);
     tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
     super.init(tajoConf);
   }
@@ -104,10 +102,6 @@ public class TajoWorkerManagerService extends CompositeService
     return bindAddr;
   }
 
-  public String getHostAndPort() {
-    return bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
   @Override
   public void ping(RpcController controller,
                    TajoIdProtos.QueryUnitAttemptIdProto attemptId,
@@ -122,24 +116,11 @@ public class TajoWorkerManagerService extends CompositeService
     workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
 
     try {
-
-      String[] params = new String[7];
-      params[0] = "standby";  //mode(never used)
-      params[1] = request.getExecutionBlockId().toString();
-      // NodeId has a form of hostname:port.
-      params[2] = request.getNodeId();
-      params[3] = request.getContainerId();
-
-      // QueryMaster's address
-      params[4] = request.getQueryMasterHost();
-      params[5] = String.valueOf(request.getQueryMasterPort());
-      params[6] = request.getQueryOutputPath();
-
-      ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId());
       workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(
-          params
-          , executionBlockId,
-          new QueryContext(workerContext.getConf(), request.getQueryContext()),
+          new WorkerConnectionInfo(request.getQueryMaster())
+          , new ExecutionBlockId(request.getExecutionBlockId())
+          , request.getContainerId()
+          , new QueryContext(workerContext.getConf(), request.getQueryContext()),
           request.getPlanJson()
       ));
       done.run(TajoWorker.TRUE_PROTO);

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index c9c83d1..66e0f87 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -307,7 +307,7 @@ public class Task {
 
   public TaskStatusProto getReport() {
     TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-    builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString());
+    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
     builder.setId(context.getTaskId().getProto())
         .setProgress(context.getProgress())
         .setState(context.getState());
@@ -323,6 +323,7 @@ public class Task {
   public boolean isRunning(){
     return context.getState() == TaskAttemptState.TA_RUNNING;
   }
+
   public boolean isProgressChanged() {
     return context.isProgressChanged();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index ea8ed82..e4771a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.ExecutionBlockId;
@@ -53,7 +52,6 @@ public class TaskRunner extends AbstractService {
   private volatile boolean stopped = false;
   private Path baseDirPath;
 
-  private NodeId nodeId;
   private ContainerId containerId;
 
   // for Fetcher
@@ -69,7 +67,7 @@ public class TaskRunner extends AbstractService {
 
   private TaskRunnerHistory history;
 
-  public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) {
+  public TaskRunner(ExecutionBlockContext executionBlockContext, String containerId) {
     super(TaskRunner.class.getName());
 
     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@@ -78,16 +76,7 @@ public class TaskRunner extends AbstractService {
     this.fetchLauncher = Executors.newFixedThreadPool(
         systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
     try {
-      // QueryBlockId from String
-      // NodeId has a form of hostname:port.
-      this.nodeId = ConverterUtils.toNodeId(args[2]);
-      this.containerId = ConverterUtils.toContainerId(args[3]);
-
-
-      // QueryMaster's address
-      //String host = args[4];
-      //int port = Integer.parseInt(args[5]);
-
+      this.containerId = ConverterUtils.toContainerId(containerId);
       this.executionBlockContext = executionBlockContext;
       this.history = executionBlockContext.createTaskRunnerHistory(this);
       this.history.setState(getServiceState());
@@ -101,10 +90,6 @@ public class TaskRunner extends AbstractService {
     return getId(getContext().getExecutionBlockId(), containerId);
   }
 
-  public NodeId getNodeId(){
-    return nodeId;
-  }
-
   public ContainerId getContainerId(){
     return containerId;
   }
@@ -212,6 +197,7 @@ public class TaskRunner extends AbstractService {
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                     .setExecutionBlockId(getExecutionBlockId().getProto())
                     .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+                    .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
                     .build();
 
                 qmClientService.getTask(null, request, callFuture);

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index c3713d1..faadf58 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -37,7 +37,6 @@ import org.jboss.netty.util.HashedWheelTimer;
 import org.jboss.netty.util.Timer;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -167,14 +166,10 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
     if (event instanceof TaskRunnerStartEvent) {
       TaskRunnerStartEvent startEvent = (TaskRunnerStartEvent) event;
       ExecutionBlockContext context = executionBlockContextMap.get(event.getExecutionBlockId());
-      String[] params = startEvent.getParams();
+
       if(context == null){
         try {
-          // QueryMaster's address
-          String host = params[4];
-          int port = Integer.parseInt(params[5]);
-
-          context = new ExecutionBlockContext(this, startEvent, new InetSocketAddress(host, port));
+          context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster());
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
           throw new RuntimeException(e);
@@ -182,7 +177,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
         executionBlockContextMap.put(event.getExecutionBlockId(), context);
       }
 
-      TaskRunner taskRunner = new TaskRunner(context, params);
+      TaskRunner taskRunner = new TaskRunner(context, startEvent.getContainerId());
       LOG.info("Start TaskRunner:" + taskRunner.getId());
       taskRunnerMap.put(taskRunner.getId(), taskRunner);
       taskRunnerHistoryMap.put(taskRunner.getId(), taskRunner.getHistory());

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 47f2261..6a90f74 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.worker;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,7 +38,6 @@ import org.apache.tajo.storage.v2.DiskUtil;
 import org.apache.tajo.util.HAServiceUtil;
 
 import java.io.File;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -68,30 +68,38 @@ public class WorkerHeartbeatService extends AbstractService {
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
     Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
     this.systemConf = (TajoConf) conf;
 
     connectionPool = RpcConnectionPool.getPool(systemConf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
     thread = new WorkerHeartbeatThread();
     thread.start();
-    super.init(conf);
+    super.serviceStart();
   }
 
   @Override
-  public void serviceStop() {
-    thread.stopped.set(true);
+  public void serviceStop() throws Exception {
+    if(thread.stopped.getAndSet(true)){
+      return;
+    }
+
     synchronized (thread) {
       thread.notifyAll();
     }
-    super.stop();
+
+    super.serviceStop();
   }
 
   class WorkerHeartbeatThread extends Thread {
     private volatile AtomicBoolean stopped = new AtomicBoolean(false);
     TajoMasterProtocol.ServerStatusProto.System systemInfo;
-    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
-        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos = Lists.newArrayList();
     float workerDiskSlots;
     int workerMemoryMB;
     List<DiskDeviceInfo> diskDeviceInfos;
@@ -140,26 +148,6 @@ public class WorkerHeartbeatService extends AbstractService {
     public void run() {
       LOG.info("Worker Resource Heartbeat Thread start.");
       int sendDiskInfoCount = 0;
-      int pullServerPort = 0;
-
-      String hostName = null;
-      int peerRpcPort = 0;
-      int queryMasterPort = 0;
-      int clientPort = 0;
-
-      if(context.getTajoWorkerManagerService() != null) {
-        hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
-        peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
-      }
-      if(context.getQueryMasterManagerService() != null) {
-        hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
-        queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
-      }
-      if(context.getTajoWorkerClientService() != null) {
-        clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
-      }
-
-      pullServerPort = context.getPullServerPort();
 
       while(!stopped.get()) {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
@@ -185,12 +173,7 @@ public class WorkerHeartbeatService extends AbstractService {
             .build();
 
         NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
-            .setTajoWorkerHost(hostName)
-            .setTajoQueryMasterPort(queryMasterPort)
-            .setPeerRpcPort(peerRpcPort)
-            .setTajoWorkerClientPort(clientPort)
-            .setTajoWorkerHttpPort(context.getHttpPort())
-            .setTajoWorkerPullServerPort(pullServerPort)
+            .setConnectionInfo(context.getConnectionInfo().getProto())
             .setServerStatus(serverStatus)
             .build();
 
@@ -241,8 +224,10 @@ public class WorkerHeartbeatService extends AbstractService {
         }
 
         try {
-          synchronized (WorkerHeartbeatThread.this){
-            wait(10 * 1000);
+          if(!stopped.get()){
+            synchronized (thread){
+              thread.wait(10 * 1000);
+            }
           }
         } catch (InterruptedException e) {
           break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
index 8c9fa51..ff63754 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java
@@ -20,25 +20,33 @@ package org.apache.tajo.worker.event;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 
 public class TaskRunnerStartEvent extends TaskRunnerEvent {
 
   private final QueryContext queryContext;
-  private final String[] params;
+  private final WorkerConnectionInfo queryMaster;
+  private final String containerId;
   private final String plan;
 
-  public TaskRunnerStartEvent(String[] params,
+  public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster,
                               ExecutionBlockId executionBlockId,
+                              String containerId,
                               QueryContext context,
                               String plan) {
     super(EventType.START, executionBlockId);
-    this.params = params;
+    this.queryMaster = queryMaster;
+    this.containerId = containerId;
     this.queryContext = context;
     this.plan = plan;
   }
 
-  public String[] getParams(){
-    return this.params;
+  public WorkerConnectionInfo getQueryMaster() {
+    return queryMaster;
+  }
+
+  public String getContainerId() {
+    return containerId;
   }
 
   public QueryContext getQueryContext() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index d46d09a..b117cac 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,16 +23,12 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
 import "TajoMasterProtocol.proto";
+import "tajo_protos.proto";
 
 message NodeHeartbeat {
-  required string tajoWorkerHost = 1;
-  required int32 peerRpcPort = 2;
-  required int32 tajoQueryMasterPort = 3;
-  optional ServerStatusProto serverStatus = 4;
-  optional int32 tajoWorkerClientPort = 5;
-  optional string statusMessage = 6;
-  optional int32 tajoWorkerPullServerPort = 7;
-  optional int32 tajoWorkerHttpPort = 8;
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  optional ServerStatusProto serverStatus = 2;
+  optional string statusMessage = 3;
 }
 
 service TajoResourceTrackerProtocolService {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 8fccbaf..7283543 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -60,14 +60,12 @@ message ServerStatusProto {
 }
 
 message TajoHeartbeat {
-  required string tajoWorkerHost = 1;
-  required int32 tajoQueryMasterPort = 2;
-  optional int32 tajoWorkerClientPort = 3;
-  optional QueryIdProto queryId = 4;
-  optional QueryState state = 5;
-  optional string statusMessage = 6;
-  optional float queryProgress = 7;
-  optional int64 queryFinishTime = 8;
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  optional QueryIdProto queryId = 2;
+  optional QueryState state = 3;
+  optional string statusMessage = 4;
+  optional float queryProgress = 5;
+  optional int64 queryFinishTime = 6;
 }
 
 message TajoHeartbeatResponse {
@@ -110,12 +108,9 @@ message WorkerResourceAllocationRequest {
 }
 
 message WorkerResourceProto {
-    required string host = 1;
-    required int32 peerRpcPort = 2;
-    required int32 queryMasterPort = 3;
-    required int32 infoPort = 4;
-    required int32 memoryMB = 5 ;
-    required float diskSlots = 6;
+    required WorkerConnectionInfoProto connectionInfo = 1;
+    required int32 memoryMB = 2 ;
+    required float diskSlots = 3;
 }
 
 message WorkerResourcesRequest {
@@ -129,15 +124,10 @@ message WorkerResourceReleaseRequest {
 
 message WorkerAllocatedResource {
     required hadoop.yarn.ContainerIdProto containerId = 1;
-    required string nodeId = 2;
-    required string workerHost = 3;
-    required int32 peerRpcPort = 4;
-    required int32 queryMasterPort = 5;
-    required int32 clientPort = 6;
-    required int32 workerPullServerPort = 7;
-
-    required int32 allocatedMemoryMB = 8;
-    required float allocatedDiskSlots = 9;
+    required WorkerConnectionInfoProto connectionInfo = 2;
+
+    required int32 allocatedMemoryMB = 3;
+    required float allocatedDiskSlots = 4;
 }
 
 message WorkerResourceAllocationResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index dff2733..bde2459 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -168,8 +168,9 @@ message QueryExecutionRequestProto {
 }
 
 message GetTaskRequestProto {
-    required hadoop.yarn.ContainerIdProto containerId = 1;
-    required ExecutionBlockIdProto executionBlockId = 2;
+    required int32 workerId = 1;
+    required hadoop.yarn.ContainerIdProto containerId = 2;
+    required ExecutionBlockIdProto executionBlockId = 3;
 }
 
 enum ShuffleType {
@@ -202,14 +203,13 @@ message DataChannelProto {
 
 message RunExecutionBlockRequestProto {
     required ExecutionBlockIdProto executionBlockId = 1;
-    required string queryMasterHost = 2;
-    required int32 queryMasterPort = 3;
-    required string nodeId = 4;
-    required string containerId = 5;
-    optional string queryOutputPath = 6;
-
-    required KeyValueSetProto queryContext = 7;
-    required string planJson = 8;
+    required WorkerConnectionInfoProto queryMaster = 2;
+    required string nodeId = 3;
+    required string containerId = 4;
+    optional string queryOutputPath = 5;
+
+    required KeyValueSetProto queryContext = 6;
+    required string planJson = 7;
 }
 
 message ExecutionBlockListProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index 0317759..6fe21a2 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -20,20 +20,21 @@
 <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
+<%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %>
 <%@ page import="org.apache.tajo.master.ha.HAService" %>
 <%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerResource" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.util.JSPUtil" %>
+<%@ page import="org.apache.tajo.util.TUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="java.util.*" %>
-<%@ page import="org.apache.tajo.util.TUtil" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
-  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
-  List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+  Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+  List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
   Collections.sort(wokerKeys);
 
   int runningQueryMasterTasks = 0;
@@ -175,17 +176,19 @@
 <%
     int no = 1;
     for(Worker queryMaster: liveQueryMasters) {
-      WorkerResource resource = queryMaster.getResource();
-          String queryMasterHttp = "http://" + queryMaster.getHostName() + ":" + queryMaster.getHttpPort() + "/index.jsp";
+        WorkerResource resource = queryMaster.getResource();
+        WorkerConnectionInfo connectionInfo = queryMaster.getConnectionInfo();
+        String queryMasterHttp = "http://" + connectionInfo.getHost()
+                + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
 %>
     <tr>
-      <td width='30' align='right'><%=no++%></td>
-      <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></a></td>
-      <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
-      <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
-      <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
-      <td width='100' align='center'><%=queryMaster.getState()%></td>
+        <td width='30' align='right'><%=no++%></td>
+        <td><a href='<%=queryMasterHttp%>'><%=connectionInfo.getHost() + ":" + connectionInfo.getQueryMasterPort()%></a></td>
+        <td width='100' align='center'><%=connectionInfo.getClientPort()%></td>
+        <td width='200' align='right'><%=resource.getNumQueryMasterTasks()%></td>
+        <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+        <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+        <td width='100' align='center'><%=queryMaster.getState()%></td>
     </tr>
 <%
     } //end fo for
@@ -210,7 +213,7 @@
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><%=queryMaster.getHostName() + ":" + queryMaster.getQueryMasterPort()%></td>
+      <td><%=queryMaster.getConnectionInfo().getHost() + ":" + queryMaster.getConnectionInfo().getQueryMasterPort()%></td>
     </tr>
 <%
       } //end fo for
@@ -236,19 +239,20 @@
 <%
     int no = 1;
     for(Worker worker: liveWorkers) {
-      WorkerResource resource = worker.getResource();
-          String workerHttp = "http://" + worker.getHostName() + ":" + worker.getHttpPort() + "/index.jsp";
+        WorkerResource resource = worker.getResource();
+        WorkerConnectionInfo connectionInfo = worker.getConnectionInfo();
+        String workerHttp = "http://" + connectionInfo.getHost() + ":" + connectionInfo.getHttpInfoPort() + "/index.jsp";
 %>
     <tr>
-      <td width='30' align='right'><%=no++%></td>
-      <td><a href='<%=workerHttp%>'><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></a></td>
-      <td width='80' align='center'><%=worker.getPullServerPort()%></td>
-      <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
-      <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
-      <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
-      <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
-      <td width='100' align='center'><%=worker.getState()%></td>
+        <td width='30' align='right'><%=no++%></td>
+        <td><a href='<%=workerHttp%>'><%=connectionInfo.getHostAndPeerRpcPort()%></a></td>
+        <td width='80' align='center'><%=connectionInfo.getPullServerPort()%></td>
+        <td width='100' align='right'><%=resource.getNumRunningTasks()%></td>
+        <td width='150' align='center'><%=resource.getUsedMemoryMB()%>/<%=resource.getMemoryMB()%></td>
+        <td width='100' align='center'><%=resource.getUsedDiskSlots()%>/<%=resource.getDiskSlots()%></td>
+        <td width='200' align='center'><%=resource.getFreeHeap()/1024/1024%>/<%=resource.getTotalHeap()/1024/1024%>/<%=resource.getMaxHeap()/1024/1024%> MB</td>
+        <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeatTime(), System.currentTimeMillis())%></td>
+        <td width='100' align='center'><%=worker.getState()%></td>
     </tr>
 <%
     } //end fo for
@@ -279,7 +283,7 @@
 %>
     <tr>
       <td width='30' align='right'><%=no++%></td>
-      <td><%=worker.getHostName() + ":" + worker.getPeerRpcPort()%></td>
+      <td><%=worker.getConnectionInfo().getHostAndPeerRpcPort()%></td>
     </tr>
 <%
       } //end fo for

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 7ab1482..ce4d7dc 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -38,8 +38,8 @@
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
-  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
-  Map<String, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
+  Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
+  Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers();
 
   int numWorkers = 0;
   int numLiveWorkers = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index e7b402f..9ddc90c 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -43,17 +43,17 @@
 
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  Map<String, Worker> workers = master.getContext().getResourceManager().getWorkers();
+  Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers();
   Map<String, Integer> portMap = new HashMap<String, Integer>();
 
-  Collection<String> queryMasters = master.getContext().getResourceManager().getQueryMasters();
+  Collection<Integer> queryMasters = master.getContext().getResourceManager().getQueryMasters();
   if (queryMasters == null || queryMasters.isEmpty()) {
     queryMasters = master.getContext().getResourceManager().getWorkers().keySet();
   }
-  for(String eachQueryMasterKey: queryMasters) {
+  for(int eachQueryMasterKey: queryMasters) {
     Worker queryMaster = workers.get(eachQueryMasterKey);
     if(queryMaster != null) {
-      portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort());
+      portMap.put(queryMaster.getConnectionInfo().getHost(), queryMaster.getConnectionInfo().getHttpInfoPort());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
index 1a325da..6e74b99 100644
--- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp
@@ -67,10 +67,10 @@
   List<TajoMasterProtocol.WorkerResourceProto> allWorkers = tajoWorker.getWorkerContext()
             .getQueryMasterManagerService().getQueryMaster().getAllWorker();
 
-  Map<String, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<String, TajoMasterProtocol.WorkerResourceProto>();
+  Map<Integer, TajoMasterProtocol.WorkerResourceProto> workerMap = new HashMap<Integer, TajoMasterProtocol.WorkerResourceProto>();
   if(allWorkers != null) {
     for(TajoMasterProtocol.WorkerResourceProto eachWorker: allWorkers) {
-      workerMap.put(eachWorker.getHost(), eachWorker);
+      workerMap.put(eachWorker.getConnectionInfo().getId(), eachWorker);
     }
   }
   QueryMasterTask queryMasterTask = tajoWorker.getWorkerContext()
@@ -201,12 +201,13 @@
 
           String queryUnitHost = eachQueryUnit.getSucceededHost() == null ? "-" : eachQueryUnit.getSucceededHost();
           if(eachQueryUnit.getSucceededHost() != null) {
-              TajoMasterProtocol.WorkerResourceProto worker = workerMap.get(eachQueryUnit.getSucceededHost());
+              TajoMasterProtocol.WorkerResourceProto worker =
+                      workerMap.get(eachQueryUnit.getLastAttempt().getWorkerConnectionInfo().getId());
               if(worker != null) {
                   QueryUnitAttempt lastAttempt = eachQueryUnit.getLastAttempt();
                   if(lastAttempt != null) {
                     QueryUnitAttemptId lastAttemptId = lastAttempt.getId();
-                    queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
+                    queryUnitHost = "<a href='http://" + eachQueryUnit.getSucceededHost() + ":" + worker.getConnectionInfo().getHttpInfoPort() + "/taskdetail.jsp?queryUnitAttemptId=" + lastAttemptId + "'>" + eachQueryUnit.getSucceededHost() + "</a>";
                   }
               }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
index e20ab03..d84664f 100644
--- a/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/taskdetail.jsp
@@ -22,7 +22,9 @@
 <%@ page import="org.apache.commons.lang.StringUtils" %>
 <%@ page import="org.apache.tajo.QueryUnitAttemptId" %>
 <%@ page import="org.apache.tajo.ipc.TajoWorkerProtocol" %>
+<%@ page import="org.apache.tajo.util.JSPUtil" %>
 <%@ page import="org.apache.tajo.util.TajoIdUtils" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
 <%@ page import="org.apache.tajo.worker.*" %>
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.List" %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/resources/webapps/worker/tasks.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/tasks.jsp b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
index b5fb9d7..ae05047 100644
--- a/tajo-core/src/main/resources/webapps/worker/tasks.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/tasks.jsp
@@ -65,9 +65,9 @@
         <tr><th>Id</th><th>StartTime</th><th>FinishTime</th><th>RunTime</th><th>Status</th></tr>
         <%
             if (taskRunner != null) {
-                TaskRunner.TaskRunnerContext taskRunnerContext = taskRunner.getContext();
+                ExecutionBlockContext context = taskRunner.getContext();
 
-                for (Map.Entry<QueryUnitAttemptId, Task> entry : taskRunnerContext.getTasks().entrySet()) {
+                for (Map.Entry<QueryUnitAttemptId, Task> entry : context.getTasks().entrySet()) {
                     QueryUnitAttemptId queryUnitId = entry.getKey();
                     TaskHistory eachTask = entry.getValue().createTaskHistory();
         %>

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
new file mode 100644
index 0000000..03be125
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/cluster/TestWorkerConnectionInfo.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.cluster;
+
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestWorkerConnectionInfo {
+
+  @Test
+  public void testWorkerId() {
+    WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080);
+    WorkerConnectionInfo worker2 = new WorkerConnectionInfo("host2", 28091, 28092, 21000, 28093, 28080);
+
+    assertNotEquals(worker.getId(), worker2.getId());
+    assertEquals(worker.getId(), new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080).getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 09d674a..0423894 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -24,6 +24,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.junit.Test;
@@ -94,11 +95,10 @@ public class TestTajoResourceManager {
           .setRunningTaskNum(0)
           .build();
 
+      WorkerConnectionInfo connectionInfo =
+          new WorkerConnectionInfo("host" + (i + 1), 28091, 28092, 21000 + i, 28093, 28080);
       NodeHeartbeat tajoHeartbeat = NodeHeartbeat.newBuilder()
-          .setTajoWorkerHost("host" + (i + 1))
-          .setTajoQueryMasterPort(21000)
-          .setTajoWorkerHttpPort(28080 + i)
-          .setPeerRpcPort(12345)
+          .setConnectionInfo(connectionInfo.getProto())
           .setServerStatus(serverStatus)
           .build();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 720f0ca..3fa67ae 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -215,11 +215,10 @@ public class TajoPullServerService extends AbstractService {
       selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
 
       localFS = new LocalFileSystem();
-      super.init(conf);
 
-      this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
+      conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname
           , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal);
-
+      super.init(conf);
       LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength);
     } catch (Throwable t) {
       LOG.error(t);
@@ -228,8 +227,7 @@ public class TajoPullServerService extends AbstractService {
 
   // TODO change AbstractService to throw InterruptedException
   @Override
-  public synchronized void start() {
-    Configuration conf = getConfig();
+  public synchronized void serviceInit(Configuration conf) throws Exception {
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
 
     try {
@@ -248,11 +246,11 @@ public class TajoPullServerService extends AbstractService {
     conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
     pipelineFact.PullServer.setPort(port);
     LOG.info(getName() + " listening on port " + port);
-    super.start();
 
     sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
                                     DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
 
+
     if (STANDALONE) {
       File pullServerPortFile = getPullServerPortFile();
       if (pullServerPortFile.exists()) {
@@ -272,6 +270,7 @@ public class TajoPullServerService extends AbstractService {
         IOUtils.closeStream(out);
       }
     }
+    super.serviceInit(conf);
     LOG.info("TajoPullServerService started: port=" + port);
   }
 
@@ -487,9 +486,7 @@ public class TajoPullServerService extends AbstractService {
       }
 
       ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
-      synchronized(processingStatusMap) {
-        processingStatusMap.put(request.getUri().toString(), processingStatus);
-      }
+      processingStatusMap.put(request.getUri().toString(), processingStatus);
       // Parsing the URL into key-values
       final Map<String, List<String>> params =
           new QueryStringDecoder(request.getUri()).getParameters();