You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/20 08:54:16 UTC

[2/2] git commit: TAJO-1016: Refactor worker rpc information. (jinho)

TAJO-1016: Refactor worker rpc information. (jinho)

Closes #125


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28282b56
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28282b56
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28282b56

Branch: refs/heads/master
Commit: 28282b561ad0a75f9603936bf04f2aa5c99b6b58
Parents: 469820d
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 15:53:06 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 15:53:06 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/client/TajoAdmin.java  |  36 ++--
 tajo-client/src/main/proto/ClientProtos.proto   |  37 ++--
 tajo-common/src/main/proto/tajo_protos.proto    |  10 ++
 .../tajo/master/DefaultTaskScheduler.java       |  18 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   4 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   5 +-
 .../tajo/master/TajoMasterClientService.java    |  12 +-
 .../apache/tajo/master/TajoMasterService.java   |   9 +-
 .../master/cluster/WorkerConnectionInfo.java    | 178 +++++++++++++++++++
 .../master/event/TaskAttemptAssignedEvent.java  |  17 +-
 .../tajo/master/event/TaskRequestEvent.java     |  13 +-
 .../master/querymaster/QueryInProgress.java     |   6 +-
 .../master/querymaster/QueryJobManager.java     |  11 +-
 .../tajo/master/querymaster/QueryMaster.java    |  18 +-
 .../querymaster/QueryMasterManagerService.java  |   2 +-
 .../tajo/master/querymaster/QueryUnit.java      |  12 +-
 .../master/querymaster/QueryUnitAttempt.java    |  32 +---
 .../apache/tajo/master/rm/TajoRMContext.java    |  14 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  24 +--
 .../master/rm/TajoWorkerResourceManager.java    |  33 ++--
 .../java/org/apache/tajo/master/rm/Worker.java  |  73 ++------
 .../org/apache/tajo/master/rm/WorkerEvent.java  |   6 +-
 .../tajo/master/rm/WorkerLivelinessMonitor.java |   4 +-
 .../tajo/master/rm/WorkerReconnectEvent.java    |   2 +-
 .../tajo/master/rm/WorkerResourceManager.java   |   6 +-
 .../tajo/master/rm/WorkerStatusEvent.java       |   2 +-
 .../tajo/worker/AbstractResourceAllocator.java  |  15 ++
 .../tajo/worker/ExecutionBlockContext.java      |  12 +-
 .../tajo/worker/TajoResourceAllocator.java      |  17 +-
 .../java/org/apache/tajo/worker/TajoWorker.java | 178 ++++++++++---------
 .../tajo/worker/TajoWorkerClientService.java    |   6 +-
 .../tajo/worker/TajoWorkerManagerService.java   |  31 +---
 .../main/java/org/apache/tajo/worker/Task.java  |   3 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  20 +--
 .../apache/tajo/worker/TaskRunnerManager.java   |  11 +-
 .../tajo/worker/WorkerHeartbeatService.java     |  57 +++---
 .../tajo/worker/event/TaskRunnerStartEvent.java |  18 +-
 .../main/proto/ResourceTrackerProtocol.proto    |  12 +-
 .../src/main/proto/TajoMasterProtocol.proto     |  36 ++--
 .../src/main/proto/TajoWorkerProtocol.proto     |  20 +--
 .../main/resources/webapps/admin/cluster.jsp    |  54 +++---
 .../src/main/resources/webapps/admin/index.jsp  |   4 +-
 .../src/main/resources/webapps/admin/query.jsp  |   8 +-
 .../resources/webapps/worker/querytasks.jsp     |   9 +-
 .../resources/webapps/worker/taskdetail.jsp     |   2 +
 .../src/main/resources/webapps/worker/tasks.jsp |   4 +-
 .../tajo/cluster/TestWorkerConnectionInfo.java  |  36 ++++
 .../tajo/master/rm/TestTajoResourceManager.java |   8 +-
 .../tajo/pullserver/TajoPullServerService.java  |  15 +-
 50 files changed, 632 insertions(+), 530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a5f31f9..5d92881 100644
--- a/CHANGES
+++ b/CHANGES
@@ -444,6 +444,8 @@ Release 0.9.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1016: Refactor worker rpc information. (jinho)
+
     TAJO-1015: Add executionblock event in worker. (jinho)
 
     TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 95dfc68..1acdb4d 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -276,14 +276,15 @@ public class TajoAdmin {
                            line5, line10, line10);
       writer.write(line);
       for (WorkerResourceInfo queryMaster : liveQueryMasters) {
-        String queryMasterHost = String.format("%s:%d",
-                                  queryMaster.getAllocatedHost(),
-                                  queryMaster.getQueryMasterPort());
-        String heap = String.format("%d MB", queryMaster.getMaxHeap()/1024/1024);
-        line = String.format(fmtQueryMasterLine, queryMasterHost,
-                             queryMaster.getClientPort(),
-                             queryMaster.getNumQueryMasterTasks(),
-                             heap, queryMaster.getWorkerStatus());
+        TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+        String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+        String heap = String.format("%d MB", queryMaster.getMaxHeap() / 1024 / 1024);
+        line = String.format(fmtQueryMasterLine,
+            queryMasterHost,
+            connInfo.getClientPort(),
+            queryMaster.getNumQueryMasterTasks(),
+            heap,
+            queryMaster.getWorkerStatus());
         writer.write(line);
       }
 
@@ -301,12 +302,12 @@ public class TajoAdmin {
       writer.write(line);
 
       for (WorkerResourceInfo queryMaster : deadQueryMasters) {
-        String queryMasterHost = String.format("%s:%d",
-                                  queryMaster.getAllocatedHost(),
-                                  queryMaster.getQueryMasterPort());
-        line = String.format(fmtQueryMasterLine, queryMasterHost,
-                             queryMaster.getClientPort(),
-                             queryMaster.getWorkerStatus());
+        TajoProtos.WorkerConnectionInfoProto connInfo = queryMaster.getConnectionInfo();
+        String queryMasterHost = String.format("%s:%d", connInfo.getHost(), connInfo.getQueryMasterPort());
+        line = String.format(fmtQueryMasterLine,
+            queryMasterHost,
+            connInfo.getClientPort(),
+            queryMaster.getWorkerStatus());
         writer.write(line);
       }
 
@@ -358,9 +359,8 @@ public class TajoAdmin {
     writer.write(line);
 
     for (WorkerResourceInfo worker : workers) {
-      String workerHost = String.format("%s:%d",
-          worker.getAllocatedHost(),
-          worker.getPeerRpcPort());
+      TajoProtos.WorkerConnectionInfoProto connInfo = worker.getConnectionInfo();
+      String workerHost = String.format("%s:%d", connInfo.getHost(), connInfo.getPeerRpcPort());
       String mem = String.format("%d/%d", worker.getUsedMemoryMB(),
           worker.getMemoryMB());
       String disk = String.format("%.2f/%.2f", worker.getUsedDiskSlots(),
@@ -369,7 +369,7 @@ public class TajoAdmin {
           worker.getMaxHeap()/1024/1024);
 
       line = String.format(fmtWorkerLine, workerHost,
-          worker.getPullServerPort(),
+          connInfo.getPullServerPort(),
           worker.getNumRunningTasks(),
           mem, disk, heap, worker.getWorkerStatus());
       writer.write(line);

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index c66b228..0359685 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -157,27 +157,22 @@ message GetClusterInfoRequest {
 }
 
 message WorkerResourceInfo {
-  required string allocatedHost = 1;
-  required int32 peerRpcPort = 2;
-  required int32 queryMasterPort = 3;
-  required int32 clientPort = 4;
-  required int32 pullServerPort = 5;
-  required int32 httpPort = 6;
-  required float diskSlots = 7;
-  required int32 cpuCoreSlots = 8;
-  required int32 memoryMB = 9;
-  required float usedDiskSlots = 10;
-  required int32 usedMemoryMB = 11;
-  required int32 usedCpuCoreSlots = 12;
-  required int64 maxHeap = 13;
-  required int64 freeHeap = 14;
-  required int64 totalHeap = 15;
-  required int32 numRunningTasks = 16;
-  required string workerStatus = 17;
-  required int64 lastHeartbeat = 18;
-  required bool queryMasterMode = 19;
-  required bool taskRunnerMode = 20;
-  required int32 numQueryMasterTasks = 21;
+  required WorkerConnectionInfoProto connectionInfo = 1;
+  required float diskSlots = 2;
+  required int32 cpuCoreSlots = 3;
+  required int32 memoryMB = 4;
+  required float usedDiskSlots = 5;
+  required int32 usedMemoryMB = 6;
+  required int32 usedCpuCoreSlots = 7;
+  required int64 maxHeap = 8;
+  required int64 freeHeap = 9;
+  required int64 totalHeap = 10;
+  required int32 numRunningTasks = 11;
+  required string workerStatus = 12;
+  required int64 lastHeartbeat = 13;
+  required bool queryMasterMode = 14;
+  required bool taskRunnerMode = 15;
+  required int32 numQueryMasterTasks = 16;
 }
 
 message GetClusterInfoResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index edd27fc..b6cd9ef 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -52,4 +52,14 @@ enum FetcherState {
   FETCH_FETCHING = 1;
   FETCH_FINISHED = 2;
   FETCH_FAILED = 3;
+}
+
+message WorkerConnectionInfoProto {
+    required int32 id = 1;
+    required string host = 2;
+    required int32 peerRpcPort = 3;
+    required int32 pullServerPort = 4;
+    optional int32 queryMasterPort = 5;
+    required int32 clientPort = 6;
+    required int32 httpInfoPort = 7;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 7684df2..2cb8878 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -33,6 +33,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -744,13 +745,15 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
 
         // getting the hostname of requested node
-        String host = container.getTaskHostName();
+        WorkerConnectionInfo connectionInfo =
+            context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
+        String host = connectionInfo.getHost();
 
         // if there are no worker matched to the hostname a task request
         if(!leafTaskHostMapping.containsKey(host)){
-          host = NetUtils.normalizeHost(host);
+          String normalizedHost = NetUtils.normalizeHost(host);
 
-          if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
+          if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
             // this case means one of either cases:
             // * there are no blocks which reside in this node.
             // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
@@ -826,8 +829,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
 
           context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(),
-              host, container.getTaskPort()));
+              taskRequest.getContainerId(), connectionInfo));
           assignedRequest.add(attemptId);
 
           scheduledObjectNum--;
@@ -891,10 +893,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             }
           }
 
-          ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
-              taskRequest.getContainerId());
+          WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
+              getWorkerConnectionInfo(taskRequest.getWorkerId());
           context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
+              taskRequest.getContainerId(), connectionInfo));
           taskRequest.getCallback().run(taskAssign.getProto());
           totalAssigned++;
           scheduledObjectNum--;

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 6552998..f7953e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -469,8 +469,6 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
 
   private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
     QueryUnitAttemptId attemptId = taskAttempt.getId();
-    ContainerProxy containerProxy = context.getMasterContext().getResourceAllocator().
-        getContainer(attemptContext.getContainerId());
     QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
         attemptId,
         new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
@@ -495,7 +493,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
     }
 
     context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-        attemptContext.getContainerId(), attemptContext.getHost(), containerProxy.getTaskPort()));
+        attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo()));
 
     totalAssigned++;
     attemptContext.getCallback().run(taskAssign.getProto());

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index c317ba5..c236c20 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -59,7 +59,7 @@ public class TajoContainerProxy extends ContainerProxy {
     context.getResourceAllocator().addContainer(containerID, this);
 
     this.hostName = container.getNodeId().getHost();
-    this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
+    this.port = ((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort();
     this.state = ContainerState.RUNNING;
 
     if (LOG.isDebugEnabled()) {
@@ -102,8 +102,7 @@ public class TajoContainerProxy extends ContainerProxy {
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
           TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
               .setExecutionBlockId(executionBlockId.getProto())
-              .setQueryMasterHost(myAddr.getHostName())
-              .setQueryMasterPort(myAddr.getPort())
+              .setQueryMaster(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getProto())
               .setNodeId(container.getNodeId().toString())
               .setContainerId(container.getId().toString())
               .setQueryOutputPath(context.getStagingDir().toString())

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 7d80a88..e69393a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -503,9 +503,9 @@ public class TajoMasterClientService extends AbstractService {
         context.getSessionManager().touch(request.getSessionId().getId());
         GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
 
-        Map<String, Worker> workers = context.getResourceManager().getWorkers();
+        Map<Integer, Worker> workers = context.getResourceManager().getWorkers();
 
-        List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+        List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet());
         Collections.sort(wokerKeys);
 
         WorkerResourceInfo.Builder workerBuilder
@@ -513,7 +513,8 @@ public class TajoMasterClientService extends AbstractService {
 
         for(Worker worker: workers.values()) {
           WorkerResource workerResource = worker.getResource();
-          workerBuilder.setAllocatedHost(worker.getHostName());
+
+          workerBuilder.setConnectionInfo(worker.getConnectionInfo().getProto());
           workerBuilder.setDiskSlots(workerResource.getDiskSlots());
           workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
           workerBuilder.setMemoryMB(workerResource.getMemoryMB());
@@ -524,11 +525,6 @@ public class TajoMasterClientService extends AbstractService {
           workerBuilder.setWorkerStatus(worker.getState().toString());
           workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
           workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
-          workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
-          workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
-          workerBuilder.setClientPort(worker.getClientPort());
-          workerBuilder.setPullServerPort(worker.getPullServerPort());
-          workerBuilder.setHttpPort(worker.getHttpPort());
           workerBuilder.setMaxHeap(workerResource.getMaxHeap());
           workerBuilder.setFreeHeap(workerResource.getFreeHeap());
           workerBuilder.setTotalHeap(workerResource.getTotalHeap());

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 5e9f729..ddf24d3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -28,6 +28,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
@@ -97,7 +98,7 @@ public class TajoMasterService extends AbstractService {
         RpcController controller,
         TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoQueryMasterPort());
+        LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo()));
       }
 
       TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
@@ -156,13 +157,9 @@ public class TajoMasterService extends AbstractService {
         TajoMasterProtocol.WorkerResourceProto.Builder workerResource =
             TajoMasterProtocol.WorkerResourceProto.newBuilder();
 
-        workerResource.setHost(worker.getHostName());
-        workerResource.setPeerRpcPort(worker.getPeerRpcPort());
-        workerResource.setInfoPort(worker.getHttpPort());
-        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
+        workerResource.setConnectionInfo(worker.getConnectionInfo().getProto());
         workerResource.setMemoryMB(resource.getMemoryMB());
         workerResource.setDiskSlots(resource.getDiskSlots());
-        workerResource.setQueryMasterPort(worker.getQueryMasterPort());
 
         builder.addWorkerResources(workerResource);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
new file mode 100644
index 0000000..78d4978
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/cluster/WorkerConnectionInfo.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.cluster;
+
+import org.apache.tajo.common.ProtoObject;
+
+import static org.apache.tajo.TajoProtos.WorkerConnectionInfoProto;
+
+public class WorkerConnectionInfo implements ProtoObject<WorkerConnectionInfoProto>, Comparable<WorkerConnectionInfo> {
+
+  /**
+  * unique worker id
+  */
+  private int id;
+  /**
+   * Hostname
+   */
+  private String host;
+  /**
+   * Peer rpc port
+   */
+  private int peerRpcPort;
+  /**
+   * pull server port
+   */
+  private int pullServerPort;
+  /**
+   * QueryMaster rpc port
+   */
+  private int queryMasterPort;
+  /**
+   * the port of client rpc which provides an client API
+   */
+  private int clientPort;
+  /**
+   * http info port
+   */
+  private int httpInfoPort;
+
+  public WorkerConnectionInfo() {
+  }
+
+  public WorkerConnectionInfo(WorkerConnectionInfoProto proto) {
+    this();
+    this.id = proto.getId();
+    this.host = proto.getHost();
+    this.peerRpcPort = proto.getPeerRpcPort();
+    this.pullServerPort = proto.getPullServerPort();
+    this.clientPort = proto.getClientPort();
+    this.httpInfoPort = proto.getHttpInfoPort();
+    this.queryMasterPort = proto.getQueryMasterPort();
+  }
+
+  public WorkerConnectionInfo(String host, int peerRpcPort, int pullServerPort, int clientPort,
+                              int queryMasterPort, int httpInfoPort) {
+    this();
+    this.host = host;
+    this.peerRpcPort = peerRpcPort;
+    this.pullServerPort = pullServerPort;
+    this.clientPort = clientPort;
+    this.queryMasterPort = queryMasterPort;
+    this.httpInfoPort = httpInfoPort;
+    this.id = hashCode();
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPeerRpcPort() {
+    return peerRpcPort;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+
+  public int getQueryMasterPort() {
+    return queryMasterPort;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+
+  public int getHttpInfoPort() {
+    return httpInfoPort;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public String getHostAndPeerRpcPort() {
+    return this.getHost() + ":" + this.getPeerRpcPort();
+  }
+
+  @Override
+  public WorkerConnectionInfoProto getProto() {
+    WorkerConnectionInfoProto.Builder builder = WorkerConnectionInfoProto.newBuilder();
+    builder.setId(id)
+        .setHost(host)
+        .setPeerRpcPort(peerRpcPort)
+        .setPullServerPort(pullServerPort)
+        .setClientPort(clientPort)
+        .setHttpInfoPort(httpInfoPort)
+        .setQueryMasterPort(queryMasterPort);
+    return builder.build();
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 493217;
+    int result = 8501;
+    result = prime * result + this.getHost().hashCode();
+    result = prime * result + this.getPeerRpcPort();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    WorkerConnectionInfo other = (WorkerConnectionInfo) obj;
+    if (!this.getHost().equals(other.getHost()))
+      return false;
+    if (this.getPeerRpcPort() != other.getPeerRpcPort())
+      return false;
+    return true;
+  }
+
+  @Override
+  public int compareTo(WorkerConnectionInfo other) {
+    int hostCompare = this.getHost().compareTo(other.getHost());
+    if (hostCompare == 0) {
+      if (this.getPeerRpcPort() > other.getPeerRpcPort()) {
+        return 1;
+      } else if (this.getPeerRpcPort() < other.getPeerRpcPort()) {
+        return -1;
+      }
+      return 0;
+    }
+    return hostCompare;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("id:").append(id).append(", ")
+        .append("host:").append(host).append(", ")
+        .append("PeerRpcPort:").append(peerRpcPort).append(", ")
+        .append("PullServerPort:").append(pullServerPort).append(", ")
+        .append("ClientPort:").append(clientPort).append(", ")
+        .append("QueryMasterPort:").append(queryMasterPort).append(", ")
+        .append("HttpInfoPort:").append(httpInfoPort);
+    return builder.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
index 4934633..e0928c5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskAttemptAssignedEvent.java
@@ -20,29 +20,24 @@ package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 
 public class TaskAttemptAssignedEvent extends TaskAttemptEvent {
   private final ContainerId cId;
-  private final String hostName;
-  private final int pullServerPort;
+  private final WorkerConnectionInfo workerConnectionInfo;
 
   public TaskAttemptAssignedEvent(QueryUnitAttemptId id, ContainerId cId,
-                                  String hostname, int pullServerPort) {
+                                  WorkerConnectionInfo connectionInfo) {
     super(id, TaskAttemptEventType.TA_ASSIGNED);
     this.cId = cId;
-    this.hostName = hostname;
-    this.pullServerPort = pullServerPort;
+    this.workerConnectionInfo = connectionInfo;
   }
 
   public ContainerId getContainerId() {
     return cId;
   }
 
-  public String getHostName() {
-    return hostName;
-  }
-
-  public int getPullServerPort() {
-    return pullServerPort;
+  public WorkerConnectionInfo getWorkerConnectionInfo(){
+    return workerConnectionInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 9be7cab..2197c33 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -31,24 +31,31 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
     TASK_REQ
   }
 
-  private final ContainerId workerId;
+  private final int workerId;
+  private final ContainerId containerId;
   private final ExecutionBlockId executionBlockId;
 
   private final RpcCallback<QueryUnitRequestProto> callback;
 
-  public TaskRequestEvent(ContainerId workerId,
+  public TaskRequestEvent(int workerId,
+                          ContainerId containerId,
                           ExecutionBlockId executionBlockId,
                           RpcCallback<QueryUnitRequestProto> callback) {
     super(TaskRequestEventType.TASK_REQ);
     this.workerId = workerId;
+    this.containerId = containerId;
     this.executionBlockId = executionBlockId;
     this.callback = callback;
   }
 
-  public ContainerId getContainerId() {
+  public int getWorkerId() {
     return this.workerId;
   }
 
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
   public ExecutionBlockId getExecutionBlockId() {
     return executionBlockId;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 261200e..877a20a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -172,9 +172,9 @@ public class QueryInProgress extends CompositeService {
         return false;
       }
 
-      queryInfo.setQueryMaster(resource.getWorkerHost());
-      queryInfo.setQueryMasterPort(resource.getQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(resource.getClientPort());
+      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
 
       getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index acaefc9..e4f47cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -31,6 +31,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.scheduler.SimpleFifoScheduler;
 
@@ -241,11 +242,11 @@ public class QueryJobManager extends CompositeService {
 
   private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) {
     QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId()));
-    if(queryHeartbeat.getTajoWorkerHost() != null) {
-      queryInfo.setQueryMaster(queryHeartbeat.getTajoWorkerHost());
-      queryInfo.setQueryMasterPort(queryHeartbeat.getTajoQueryMasterPort());
-      queryInfo.setQueryMasterclientPort(queryHeartbeat.getTajoWorkerClientPort());
-    }
+    WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo());
+
+    queryInfo.setQueryMaster(connectionInfo.getHost());
+    queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
+    queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
     queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
     queryInfo.setQueryState(queryHeartbeat.getState());
     queryInfo.setProgress(queryHeartbeat.getQueryProgress());

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b54675c..b8c39e0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -192,9 +192,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
     for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
       try {
-        if (worker.getPeerRpcPort() == 0) continue;
-
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+        TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
             TajoWorkerProtocol.class, true);
         TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
 
@@ -214,9 +213,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
     for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
       try {
-        if (worker.getPeerRpcPort() == 0) continue;
-
-        rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()),
+        TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo();
+        rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()),
             TajoWorkerProtocol.class, true);
         TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub();
 
@@ -299,9 +297,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
 
       TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder()
-          .setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName())
-          .setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort())
-          .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+          .setConnectionInfo(workerContext.getConnectionInfo().getProto())
           .setState(state)
           .setQueryId(queryId.getProto());
 
@@ -460,9 +456,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
   private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) {
     TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
 
-    builder.setTajoWorkerHost(workerContext.getQueryMasterManagerService().getBindAddr().getHostName());
-    builder.setTajoQueryMasterPort(workerContext.getQueryMasterManagerService().getBindAddr().getPort());
-    builder.setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort());
+    builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
     builder.setState(queryMasterTask.getState());
     builder.setQueryId(queryMasterTask.getQueryId().getProto());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 862dfef..f953995 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -133,7 +133,7 @@ public class QueryMasterManagerService extends CompositeService
         ContainerId cid =
             queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
-        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(request.getWorkerId(), cid, ebId, done));
       }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index f41fd0e..03c6d30 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -522,8 +522,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
 
       task.successfulAttempt = attemptEvent.getTaskAttemptId();
-      task.succeededHost = attempt.getHost();
-      task.succeededPullServerPort = attempt.getPullServerPort();
+      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
+      task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort();
 
       task.finishTask();
       task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
@@ -537,7 +537,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
       QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
       task.launchTime = System.currentTimeMillis();
-      task.succeededHost = attempt.getHost();
+      task.succeededHost = attempt.getWorkerConnectionInfo().getHost();
     }
   }
 
@@ -632,9 +632,12 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   public static class PullHost implements Cloneable {
     String host;
     int port;
+    int hashCode;
+
     public PullHost(String pullServerAddr, int pullServerPort){
       this.host = pullServerAddr;
       this.port = pullServerPort;
+      this.hashCode = Objects.hashCode(host, port);
     }
     public String getHost() {
       return host;
@@ -650,7 +653,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(host, port);
+      return hashCode;
     }
 
     @Override
@@ -668,6 +671,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       PullHost newPullHost = (PullHost) super.clone();
       newPullHost.host = host;
       newPullHost.port = port;
+      newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port);
       return newPullHost;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index a4fa12f..db6f130 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
@@ -55,8 +56,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   final EventHandler eventHandler;
 
   private ContainerId containerId;
-  private String hostName;
-  private int port;
+  private WorkerConnectionInfo workerConnectionInfo;
   private int expire;
 
   private final Lock readLock;
@@ -210,30 +210,14 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     return this.queryUnit;
   }
 
-  public String getHost() {
-    return this.hostName;
-  }
-
-  public int getPort() {
-    return this.port;
+  public WorkerConnectionInfo getWorkerConnectionInfo() {
+    return this.workerConnectionInfo;
   }
 
   public void setContainerId(ContainerId containerId) {
     this.containerId = containerId;
   }
 
-  public void setHost(String host) {
-    this.hostName = host;
-  }
-
-  public void setPullServerPort(int port) {
-    this.port = port;
-  }
-
-  public int getPullServerPort() {
-    return port;
-  }
-
   public synchronized void setExpireTime(int expire) {
     this.expire = expire;
   }
@@ -277,7 +261,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     if (report.getShuffleFileOutputsCount() > 0) {
       this.getQueryUnit().setShuffleFileOutputs(report.getShuffleFileOutputsList());
 
-      PullHost host = new PullHost(getHost(), getPullServerPort());
+      PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort());
       for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) {
         IntermediateEntry entry = new IntermediateEntry(getId().getQueryUnitId().getId(),
             getId().getId(), p.getPartId(), host, p.getVolume());
@@ -325,8 +309,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
                            TaskAttemptEvent event) {
       TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
       taskAttempt.containerId = castEvent.getContainerId();
-      taskAttempt.setHost(castEvent.getHostName());
-      taskAttempt.setPullServerPort(castEvent.getPullServerPort());
+      taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo();
       taskAttempt.eventHandler.handle(
           new TaskTAttemptEvent(taskAttempt.getId(),
               TaskEventType.T_ATTEMPT_LAUNCHED));
@@ -415,7 +398,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
       taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
-      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getHost() + " >> " + errorEvent.errorMessage());
+      LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost()
+          + " >> " + errorEvent.errorMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
index 2229f04..5d07ff2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java
@@ -37,16 +37,16 @@ public class TajoRMContext {
   final Dispatcher rmDispatcher;
 
   /** map between workerIds and running workers */
-  private final ConcurrentMap<String, Worker> workers = new ConcurrentHashMap<String, Worker>();
+  private final ConcurrentMap<Integer, Worker> workers = Maps.newConcurrentMap();
 
   /** map between workerIds and inactive workers */
-  private final ConcurrentMap<String, Worker> inactiveWorkers = new ConcurrentHashMap<String, Worker>();
+  private final ConcurrentMap<Integer, Worker> inactiveWorkers = Maps.newConcurrentMap();
 
   /** map between queryIds and query master ContainerId */
   private final ConcurrentMap<QueryId, ContainerIdProto> qmContainerMap = Maps.newConcurrentMap();
 
-  private final Set<String> liveQueryMasterWorkerResources =
-      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+  private final Set<Integer> liveQueryMasterWorkerResources =
+      Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
 
   private final Set<QueryId> stoppedQueryIds =
       Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
@@ -62,14 +62,14 @@ public class TajoRMContext {
   /**
    * @return The Map for active workers
    */
-  public ConcurrentMap<String, Worker> getWorkers() {
+  public ConcurrentMap<Integer, Worker> getWorkers() {
     return workers;
   }
 
   /**
    * @return The Map for inactive workers
    */
-  public ConcurrentMap<String, Worker> getInactiveWorkers() {
+  public ConcurrentMap<Integer, Worker> getInactiveWorkers() {
     return inactiveWorkers;
   }
 
@@ -81,7 +81,7 @@ public class TajoRMContext {
     return qmContainerMap;
   }
 
-  public Set<String> getQueryMasterWorker() {
+  public Set<Integer> getQueryMasterWorker() {
     return liveQueryMasterWorkerResources;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 4bd7adb..831ce43 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
@@ -111,9 +112,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   /** The response builder */
   private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
 
-  private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
+  private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) {
     return new WorkerStatusEvent(
-        workerKey,
+        workerId,
         heartbeat.getServerStatus().getRunningTaskNum(),
         heartbeat.getServerStatus().getJvmHeap().getMaxHeap(),
         heartbeat.getServerStatus().getJvmHeap().getFreeHeap(),
@@ -128,7 +129,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
 
     try {
       // get a workerId from the heartbeat
-      String workerId = createWorkerId(heartbeat);
+      int workerId = heartbeat.getConnectionInfo().getId();
 
       if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running
 
@@ -145,7 +146,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
 
         // create new worker instance
         Worker newWorker = createWorkerResource(heartbeat);
-        String newWorkerId = newWorker.getWorkerId();
+        int newWorkerId = newWorker.getWorkerId();
         // add the new worker to the list of active workers
         rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker);
 
@@ -178,10 +179,6 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     }
   }
 
-  private static final String createWorkerId(NodeHeartbeat heartbeat) {
-    return heartbeat.getTajoWorkerHost() + ":" + heartbeat.getTajoQueryMasterPort() + ":" + heartbeat.getPeerRpcPort();
-  }
-
   private Worker createWorkerResource(NodeHeartbeat request) {
     boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
     boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
@@ -204,14 +201,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
       workerResource.setCpuCoreSlots(4);
     }
 
-    Worker worker = new Worker(rmContext, workerResource);
-    worker.setHostName(request.getTajoWorkerHost());
-    worker.setHttpPort(request.getTajoWorkerHttpPort());
-    worker.setPeerRpcPort(request.getPeerRpcPort());
-    worker.setQueryMasterPort(request.getTajoQueryMasterPort());
-    worker.setClientPort(request.getTajoWorkerClientPort());
-    worker.setPullServerPort(request.getTajoWorkerPullServerPort());
-    return worker;
+    return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo()));
   }
 
   public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
@@ -224,7 +214,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
     int totalAvailableMemoryMB = 0;
 
     synchronized(rmContext) {
-      for(String eachWorker: rmContext.getWorkers().keySet()) {
+      for(int eachWorker: rmContext.getWorkers().keySet()) {
         Worker worker = rmContext.getWorkers().get(eachWorker);
         WorkerResource resource = worker.getResource();
         if(worker != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 3915225..0e3ccad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -134,7 +134,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
     @Override
     public void handle(WorkerEvent event) {
-      String workerId = event.getWorkerId();
+      int workerId = event.getWorkerId();
       Worker node = this.rmContext.getWorkers().get(workerId);
       if (node != null) {
         try {
@@ -147,16 +147,16 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
   }
 
   @Override
-  public Map<String, Worker> getWorkers() {
+  public Map<Integer, Worker> getWorkers() {
     return ImmutableMap.copyOf(rmContext.getWorkers());
   }
 
   @Override
-  public Map<String, Worker> getInactiveWorkers() {
+  public Map<Integer, Worker> getInactiveWorkers() {
     return ImmutableMap.copyOf(rmContext.getInactiveWorkers());
   }
 
-  public Collection<String> getQueryMasters() {
+  public Collection<Integer> getQueryMasters() {
     return Collections.unmodifiableSet(rmContext.getQueryMasterWorker());
   }
 
@@ -303,8 +303,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
                   new ArrayList<WorkerAllocatedResource>();
 
               for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
-                NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getHostName(),
-                    allocatedResource.worker.getPeerRpcPort());
+                NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
+                    allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
 
                 TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
@@ -315,12 +315,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
                 ContainerIdProto containerIdProto = containerId.getProto();
                 allocatedResources.add(WorkerAllocatedResource.newBuilder()
                     .setContainerId(containerIdProto)
-                    .setNodeId(nodeId.toString())
-                    .setWorkerHost(allocatedResource.worker.getHostName())
-                    .setQueryMasterPort(allocatedResource.worker.getQueryMasterPort())
-                    .setClientPort(allocatedResource.worker.getClientPort())
-                    .setPeerRpcPort(allocatedResource.worker.getPeerRpcPort())
-                    .setWorkerPullServerPort(allocatedResource.worker.getPullServerPort())
+                    .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
                     .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
                     .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
                     .build());
@@ -339,7 +334,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
               if(LOG.isDebugEnabled()) {
                 LOG.debug("=========================================");
                 LOG.debug("Available Workers");
-                for(String liveWorker: rmContext.getWorkers().keySet()) {
+                for(int liveWorker: rmContext.getWorkers().keySet()) {
                   LOG.debug(rmContext.getWorkers().get(liveWorker).toString());
                 }
                 LOG.debug("=========================================");
@@ -367,7 +362,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
     if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
       synchronized(rmContext) {
-        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+        List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
         Collections.shuffle(randomWorkers);
 
         int numContainers = resourceRequest.request.getNumContainers();
@@ -377,7 +372,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
             resourceRequest.request.getMinDiskSlotPerContainer());
 
         int liveWorkerSize = randomWorkers.size();
-        Set<String> insufficientWorkers = new HashSet<String>();
+        Set<Integer> insufficientWorkers = new HashSet<Integer>();
         boolean stop = false;
         boolean checkMax = true;
         while(!stop) {
@@ -394,7 +389,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
           }
           int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
 
-          for(String eachWorker: randomWorkers) {
+          for(int eachWorker: randomWorkers) {
             if(allocatedResources >= numContainers) {
               stop = true;
               break;
@@ -436,7 +431,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
       }
     } else {
       synchronized(rmContext) {
-        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
+        List<Integer> randomWorkers = new ArrayList<Integer>(rmContext.getWorkers().keySet());
         Collections.shuffle(randomWorkers);
 
         int numContainers = resourceRequest.request.getNumContainers();
@@ -446,7 +441,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
             resourceRequest.request.getMinMemoryMBPerContainer());
 
         int liveWorkerSize = randomWorkers.size();
-        Set<String> insufficientWorkers = new HashSet<String>();
+        Set<Integer> insufficientWorkers = new HashSet<Integer>();
         boolean stop = false;
         boolean checkMax = true;
         while(!stop) {
@@ -463,7 +458,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
           }
           float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
 
-          for(String eachWorker: randomWorkers) {
+          for(int eachWorker: randomWorkers) {
             if(allocatedResources >= numContainers) {
               stop = true;
               break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index de6ee9e..edded4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 
 import java.util.EnumSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -39,24 +40,15 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
   /** context of {@link org.apache.tajo.master.rm.TajoWorkerResourceManager} */
   private final TajoRMContext rmContext;
 
-  /** Hostname */
-  private String hostName;
-  /** QueryMaster rpc port */
-  private int qmRpcPort;
-  /** Peer rpc port */
-  private int peerRpcPort;
-  /** http info port */
-  private int httpInfoPort;
-  /** the port of QueryMaster client rpc which provides an client API */
-  private int qmClientPort;
-  /** pull server port */
-  private int pullServerPort;
   /** last heartbeat time */
   private long lastHeartbeatTime;
 
   /** Resource capability */
   private WorkerResource resource;
 
+  /** Worker connection information */
+  private WorkerConnectionInfo connectionInfo;
+
   private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition();
   private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition();
 
@@ -99,9 +91,10 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
   private final StateMachine<WorkerState, WorkerEventType, WorkerEvent> stateMachine =
       stateMachineFactory.make(this, WorkerState.NEW);
 
-  public Worker(TajoRMContext rmContext, WorkerResource resource) {
+  public Worker(TajoRMContext rmContext, WorkerResource resource, WorkerConnectionInfo connectionInfo) {
     this.rmContext = rmContext;
 
+    this.connectionInfo = connectionInfo;
     this.lastHeartbeatTime = System.currentTimeMillis();
     this.resource = resource;
 
@@ -110,56 +103,12 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
     this.writeLock = lock.writeLock();
   }
 
-  public String getWorkerId() {
-    return hostName + ":" + qmRpcPort + ":" + peerRpcPort;
-  }
-
-  public String getHostName() {
-    return hostName;
-  }
-
-  public void setHostName(String allocatedHost) {
-    this.hostName = allocatedHost;
-  }
-
-  public int getPeerRpcPort() {
-    return peerRpcPort;
-  }
-
-  public void setPeerRpcPort(int peerRpcPort) {
-    this.peerRpcPort = peerRpcPort;
-  }
-
-  public int getQueryMasterPort() {
-    return qmRpcPort;
-  }
-
-  public void setQueryMasterPort(int queryMasterPort) {
-    this.qmRpcPort = queryMasterPort;
-  }
-
-  public int getClientPort() {
-    return qmClientPort;
-  }
-
-  public void setClientPort(int clientPort) {
-    this.qmClientPort = clientPort;
-  }
-
-  public int getPullServerPort() {
-    return pullServerPort;
-  }
-
-  public void setPullServerPort(int pullServerPort) {
-    this.pullServerPort = pullServerPort;
-  }
-
-  public int getHttpPort() {
-    return httpInfoPort;
+  public int getWorkerId() {
+    return connectionInfo.getId();
   }
 
-  public void setHttpPort(int port) {
-    this.httpInfoPort = port;
+  public WorkerConnectionInfo getConnectionInfo() {
+    return connectionInfo;
   }
 
   public void setLastHeartbeatTime(long lastheartbeatReportTime) {
@@ -209,7 +158,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
     if(o == null) {
       return 1;
     }
-    return getWorkerId().compareTo(o.getWorkerId());
+    return connectionInfo.compareTo(o.connectionInfo);
   }
 
   public static class AddNodeTransition implements SingleArcTransition<Worker, WorkerEvent> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
index 389c3be..c208990 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerEvent.java
@@ -24,14 +24,14 @@ import org.apache.hadoop.yarn.event.AbstractEvent;
  * WorkerEvent describes all kinds of events which sent to {@link Worker}.
  */
 public class WorkerEvent extends AbstractEvent<WorkerEventType> {
-  private final String workerId;
+  private final int workerId;
 
-  public WorkerEvent(String workerId, WorkerEventType workerEventType) {
+  public WorkerEvent(int workerId, WorkerEventType workerEventType) {
     super(workerEventType);
     this.workerId = workerId;
   }
 
-  public String getWorkerId() {
+  public int getWorkerId() {
     return workerId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
index e3524d6..2751886 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerLivelinessMonitor.java
@@ -30,7 +30,7 @@ import org.apache.tajo.conf.TajoConf;
  * It periodically checks the latest heartbeat time of {@link Worker}.
  * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link Worker}.
  */
-public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<Integer> {
 
   private EventHandler dispatcher;
 
@@ -50,7 +50,7 @@ public class WorkerLivelinessMonitor extends AbstractLivelinessMonitor<String> {
   }
 
   @Override
-  protected void expire(String id) {
+  protected void expire(Integer id) {
     dispatcher.handle(new WorkerEvent(id, WorkerEventType.EXPIRE));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
index 46f286d..3828b6a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerReconnectEvent.java
@@ -24,7 +24,7 @@ package org.apache.tajo.master.rm;
  */
 public class WorkerReconnectEvent extends WorkerEvent {
   private final Worker worker;
-  public WorkerReconnectEvent(String workerId, Worker worker) {
+  public WorkerReconnectEvent(int workerId, Worker worker) {
     super(workerId, WorkerEventType.RECONNECTED);
     this.worker = worker;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 54fe11c..8e8ac51 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -86,13 +86,13 @@ public interface WorkerResourceManager extends Service {
    *
    * @return a Map instance containing active workers
    */
-  public Map<String, Worker> getWorkers();
+  public Map<Integer, Worker> getWorkers();
 
   /**
    *
    * @return a Map instance containing inactive workers
    */
-  public Map<String, Worker> getInactiveWorkers();
+  public Map<Integer, Worker> getInactiveWorkers();
 
   public void stop();
 
@@ -106,5 +106,5 @@ public interface WorkerResourceManager extends Service {
    *
    * @return WorkerIds on which QueryMasters are running
    */
-  Collection<String> getQueryMasters();
+  Collection<Integer> getQueryMasters();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
index 8c3d7c1..f1ab401 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerStatusEvent.java
@@ -28,7 +28,7 @@ public class WorkerStatusEvent extends WorkerEvent {
   private final long freeHeap;
   private final long totalHeap;
 
-  public WorkerStatusEvent(String workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
+  public WorkerStatusEvent(int workerId, int runningTaskNum, long maxHeap, long freeHeap, long totalHeap) {
     super(workerId, WorkerEventType.STATE_UPDATE);
     this.runningTaskNum = runningTaskNum;
     this.maxHeap = maxHeap;

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index 55aa8c4..ca71c53 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -22,10 +22,25 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+  /**
+   * A key is worker id, and a value is a worker connection information.
+   */
+  protected ConcurrentMap<Integer, WorkerConnectionInfo> workerInfoMap = Maps.newConcurrentMap();
+
+  public WorkerConnectionInfo getWorkerConnectionInfo(int workerId) {
+    return workerInfoMap.get(workerId);
+  }
+
+  public void addWorkerConnectionInfo(WorkerConnectionInfo connectionInfo) {
+    workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
+  }
+
   private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
 
   public AbstractResourceAllocator() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index d4b9861..1ec8a88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -33,12 +33,14 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -79,6 +81,7 @@ public class ExecutionBlockContext {
   private TajoQueryEngine queryEngine;
   private RpcConnectionPool connPool;
   private InetSocketAddress qmMasterAddr;
+  private WorkerConnectionInfo queryMaster;
   private TajoConf systemConf;
   // for the doAs block
   private UserGroupInformation taskOwner;
@@ -92,12 +95,12 @@ public class ExecutionBlockContext {
 
   private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
 
-  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster)
       throws Throwable {
     this.manager = manager;
     this.executionBlockId = event.getExecutionBlockId();
     this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
-    this.qmMasterAddr = queryMaster;
+    this.queryMaster = queryMaster;
     this.systemConf = manager.getTajoConf();
     this.reporter = new Reporter();
     this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
@@ -118,6 +121,7 @@ public class ExecutionBlockContext {
     LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
     LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
 
+    this.qmMasterAddr = NetUtils.createSocketAddr(queryMaster.getHost(), queryMaster.getQueryMasterPort());
     LOG.info("QueryMaster Address:" + qmMasterAddr);
 
     UserGroupInformation.setConfiguration(systemConf);
@@ -329,8 +333,8 @@ public class ExecutionBlockContext {
         intermediateBuilder.clear();
 
         intermediateBuilder.setEbId(ebId.getProto())
-            .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" +
-                getWorkerContext().getPullServerPort())
+            .setHost(getWorkerContext().getConnectionInfo().getHost() + ":" +
+                getWorkerContext().getConnectionInfo().getPullServerPort())
             .setTaskId(-1)
             .setAttemptId(-1)
             .setPartId(eachShuffle.getPartId())

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2cc8f0c..2220089 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -34,6 +34,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
@@ -129,6 +130,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         LOG.warn(e.getMessage());
       }
     }
+
+    workerInfoMap.clear();
     super.stop();
   }
 
@@ -325,8 +328,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         List<Container> containers = new ArrayList<Container>();
         for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
-          NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
-              eachAllocatedResource.getPeerRpcPort());
+          NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
+              eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
 
           TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
@@ -343,14 +346,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
           workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
 
-          Worker worker = new Worker(null, workerResource);
-          worker.setHostName(nodeId.getHost());
-          worker.setPeerRpcPort(nodeId.getPort());
-          worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
-          worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
-
+          Worker worker = new Worker(null, workerResource,
+              new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
           container.setWorkerResource(worker);
-
+          addWorkerConnectionInfo(worker.getConnectionInfo());
           containers.add(container);
         }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index a8d661b..280fc2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.common.exception.NotImplementedException;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.ha.TajoMasterInfo;
 import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.querymaster.QueryMasterManagerService;
@@ -94,12 +96,13 @@ public class TajoWorker extends CompositeService {
 
   private TajoPullServerService pullService;
 
-  private int pullServerPort;
-
+  @Deprecated
   private boolean yarnContainerMode;
 
+  @Deprecated
   private boolean queryMasterMode;
 
+  @Deprecated
   private boolean taskRunnerMode;
 
   private WorkerHeartbeatService workerHeartbeatThread;
@@ -110,7 +113,7 @@ public class TajoWorker extends CompositeService {
 
   private TajoMasterProtocol.ClusterResourceSummary clusterResource;
 
-  private int httpPort;
+  private WorkerConnectionInfo connectionInfo;
 
   private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
 
@@ -199,71 +202,52 @@ public class TajoWorker extends CompositeService {
     this.dispatcher = new AsyncDispatcher();
     addIfService(dispatcher);
 
+    tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
+    addIfService(tajoWorkerManagerService);
+
     // querymaster worker
     tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
-    addService(tajoWorkerClientService);
+    addIfService(tajoWorkerClientService);
 
     queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
-    addService(queryMasterManagerService);
+    addIfService(queryMasterManagerService);
 
     // taskrunner worker
     taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
     addService(taskRunnerManager);
 
-    tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
-    addService(tajoWorkerManagerService);
-
-    if(!yarnContainerMode) {
-      if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
-        pullService = new TajoPullServerService();
-        addService(pullService);
-      }
+    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
+    addIfService(workerHeartbeatThread);
 
-      if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
-        try {
-          httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
-          if(queryMasterMode && !taskRunnerMode) {
-            //If QueryMaster and TaskRunner run on single host, http port conflicts
-            httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
-          }
-          webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
-              true, null, systemConf, null);
-          webServer.start();
-          httpPort = webServer.getPort();
-          LOG.info("Worker info server started:" + httpPort);
-
-          deletionService = new DeletionService(getMountPath().size(), 0);
-          if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
-            getWorkerContext().cleanupTemporalDirectories();
-          }
-        } catch (IOException e) {
-          LOG.error(e.getMessage(), e);
-        }
-      }
+    int httpPort = 0;
+    if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
+      pullService = new TajoPullServerService();
+      addIfService(pullService);
     }
 
-    LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
-        ", qmRpcPort=" + qmManagerPort +
-        ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
-        ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
+    if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+      httpPort = initWebServer();
+    }
 
     super.serviceInit(conf);
 
-    tajoMasterInfo = new TajoMasterInfo();
-    if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
-      tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+    int pullServerPort;
+    if(pullService != null){
+      pullServerPort = pullService.getPort();
     } else {
-      tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
-          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
-      tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
-          .RESOURCE_TRACKER_RPC_ADDRESS)));
+      pullServerPort = getStandAlonePullServerPort();
     }
-    connectToCatalog();
 
-    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
-    workerHeartbeatThread.init(conf);
-    addIfService(workerHeartbeatThread);
+    this.connectionInfo = new WorkerConnectionInfo(
+        tajoWorkerManagerService.getBindAddr().getHostName(),
+        tajoWorkerManagerService.getBindAddr().getPort(),
+        pullServerPort,
+        tajoWorkerClientService.getBindAddr().getPort(),
+        queryMasterManagerService.getBindAddr().getPort(),
+        httpPort);
+
+    LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode
+        + " connection :" + connectionInfo.toString());
 
     try {
       hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf);
@@ -300,14 +284,57 @@ public class TajoWorker extends CompositeService {
     });
   }
 
+  private int initWebServer() {
+    int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
+    try {
+      if (queryMasterMode && !taskRunnerMode) {
+        //If QueryMaster and TaskRunner run on single host, http port conflicts
+        httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
+      }
+      webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort,
+          true, null, systemConf, null);
+      webServer.start();
+      httpPort = webServer.getPort();
+      LOG.info("Worker info server started:" + httpPort);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+    return httpPort;
+  }
+
+  private void initCleanupService() throws IOException {
+    deletionService = new DeletionService(getMountPath().size(), 0);
+    if (systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)) {
+      getWorkerContext().cleanupTemporalDirectories();
+    }
+  }
+
   public WorkerContext getWorkerContext() {
     return workerContext;
   }
 
   @Override
   public void serviceStart() throws Exception {
+
+    tajoMasterInfo = new TajoMasterInfo();
+    if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf));
+      tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf));
+    } else {
+      tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+          .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)));
+      tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars
+          .RESOURCE_TRACKER_RPC_ADDRESS)));
+    }
+    connectToCatalog();
+
+    if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+      initCleanupService();
+    }
+
     initWorkerMetrics();
     super.serviceStart();
+    LOG.info("Tajo Worker is started");
   }
 
   @Override
@@ -319,7 +346,7 @@ public class TajoWorker extends CompositeService {
     if(webServer != null) {
       try {
         webServer.stop();
-      } catch (Exception e) {
+      } catch (Throwable e) {
         LOG.error(e.getMessage(), e);
       }
     }
@@ -336,7 +363,7 @@ public class TajoWorker extends CompositeService {
     if(webServer != null && webServer.isAlive()) {
       try {
         webServer.stop();
-      } catch (Exception e) {
+      } catch (Throwable e) {
       }
     }
 
@@ -381,15 +408,19 @@ public class TajoWorker extends CompositeService {
       return catalogClient;
     }
 
-    public int getHttpPort() {
-      return httpPort;
+    public TajoPullServerService getPullService() {
+      return pullService;
+    }
+
+    public WorkerConnectionInfo getConnectionInfo() {
+      return connectionInfo;
     }
 
     public String getWorkerName() {
       if (queryMasterMode) {
         return getQueryMasterManagerService().getHostAndPort();
       } else {
-        return getTajoWorkerManagerService().getHostAndPort();
+        return connectionInfo.getHostAndPeerRpcPort();
       }
     }
 
@@ -444,6 +475,7 @@ public class TajoWorker extends CompositeService {
       }
     }
 
+    @Deprecated
     public boolean isYarnContainerMode() {
       return yarnContainerMode;
     }
@@ -503,45 +535,20 @@ public class TajoWorker extends CompositeService {
     public HashShuffleAppenderManager getHashShuffleAppenderManager() {
       return hashShuffleAppenderManager;
     }
-
-    public int getPullServerPort() {
-      if (pullService != null) {
-        long startTime = System.currentTimeMillis();
-        while (true) {
-          int pullServerPort = pullService.getPort();
-          if (pullServerPort > 0) {
-            return pullServerPort;
-          }
-          try {
-            Thread.sleep(1000);
-          } catch (InterruptedException e) {
-          }
-          if (System.currentTimeMillis() - startTime > 30 * 1000) {
-            LOG.fatal("TajoWorker stopped cause can't get PullServer port.");
-            System.exit(-1);
-          }
-        }
-      } else {
-        if (pullServerPort != 0) {
-          return pullServerPort;
-        } else {
-          loadPullServerPort();
-          return pullServerPort;
-        }
-      }
-    }
   }
 
-  private void loadPullServerPort() {
-    // get pull server port
+  private int getStandAlonePullServerPort() {
     long startTime = System.currentTimeMillis();
+    int pullServerPort;
+
+    //wait for pull server bring up
     while (true) {
       pullServerPort = TajoPullServerService.readPullServerPort();
       if (pullServerPort > 0) {
         break;
       }
       try {
-        Thread.sleep(1000);
+        Thread.sleep(500);
       } catch (InterruptedException e) {
       }
       if (System.currentTimeMillis() - startTime > 30 * 1000) {
@@ -549,6 +556,7 @@ public class TajoWorker extends CompositeService {
         System.exit(-1);
       }
     }
+    return pullServerPort;
   }
 
   public void stopWorkerForce() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/28282b56/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index d25013c..fb4f861 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -54,7 +54,7 @@ public class TajoWorkerClientService extends AbstractService {
 
   private BlockingRpcServer rpcServer;
   private InetSocketAddress bindAddr;
-  private String addr;
+
   private int port;
   private TajoConf conf;
   private TajoWorker.WorkerContext workerContext;
@@ -88,14 +88,12 @@ public class TajoWorkerClientService extends AbstractService {
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
       this.port = bindAddr.getPort();
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
     }
     // Get the master address
-    LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+    LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + bindAddr);
 
     super.init(conf);
   }