You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/29 04:35:49 UTC

git commit: TAJO-137: Unreleased resources and wrong allocation requests in TajoWorkerResourceManager. (hyoungjunkim via hyunsik)

Updated Branches:
  refs/heads/master 4e47471c3 -> ce84eba43


TAJO-137: Unreleased resources and wrong allocation requests in TajoWorkerResourceManager. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ce84eba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ce84eba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ce84eba4

Branch: refs/heads/master
Commit: ce84eba436aaf639920bb38ff8d5081d01c23686
Parents: 4e47471
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Aug 29 11:20:47 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Aug 29 11:20:47 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 pom.xml                                         |   1 +
 tajo-core/tajo-core-backend/pom.xml             |  38 +++-
 .../apache/tajo/master/TajoAsyncDispatcher.java |   4 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   7 +-
 .../apache/tajo/master/TajoMasterService.java   |  18 +-
 .../apache/tajo/master/TaskSchedulerImpl.java   |   3 +-
 .../master/querymaster/QueryInProgress.java     |  32 +--
 .../tajo/master/querymaster/QueryInfo.java      |   4 +-
 .../master/querymaster/QueryJobManager.java     |   8 +-
 .../tajo/master/querymaster/SubQuery.java       |   8 +-
 .../master/rm/TajoWorkerResourceManager.java    | 120 ++++++++---
 .../apache/tajo/master/rm/WorkerResource.java   |  83 +++++---
 .../tajo/master/rm/WorkerResourceManager.java   |   5 +
 .../master/rm/YarnRMContainerAllocator.java     |   4 +-
 .../tajo/master/rm/YarnTajoResourceManager.java |   8 +
 .../tajo/worker/AbstractResourceAllocator.java  |  25 ---
 .../apache/tajo/worker/ResourceAllocator.java   |   1 +
 .../tajo/worker/TajoResourceAllocator.java      |  16 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  83 ++++++--
 .../java/org/apache/tajo/worker/TaskRunner.java |  12 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |  12 +-
 .../tajo/worker/YarnResourceAllocator.java      |   7 +-
 .../src/main/proto/TajoMasterProtocol.proto     |  12 +-
 .../src/main/resources/webapps/admin/index.jsp  | 210 +++++--------------
 25 files changed, 382 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 787c0b5..ac1c60a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -112,6 +112,9 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
+    TAJO-137: Unreleased resources and wrong allocation requests in
+    TajoWorkerResourceManager. (hyoungjunkim via hyunsik)
+
     TAJO-130: Same queryConf file conflicts. (jinho)
 
     TAJO-82: NullPointerException occurs when Schema is converted as an array 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 07b05a1..661708e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,7 @@
             <exclude>**/*.sql</exclude>
             <exclude>**/*.schema</exclude>
             <exclude>**/*.tbl</exclude>
+            <exclude>**/*.jsp</exclude>
             <!-- generated content -->
             <exclude>**/target/**</exclude>
             <exclude>**/*.log</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index 6c3351f..d1d9f9f 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -18,7 +18,7 @@
   -->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>tajo-project</artifactId>
@@ -41,7 +41,7 @@
     <repository>
       <id>repository.jboss.org</id>
       <url>https://repository.jboss.org/nexus/content/repositories/releases/
-            </url>
+      </url>
       <snapshots>
         <enabled>false</enabled>
       </snapshots>
@@ -340,6 +340,40 @@
       <artifactId>dspace-geoip</artifactId>
       <version>1.2.3</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.eclipse.jdt</groupId>
+      <artifactId>core</artifactId>
+      <version>3.1.1</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>servlet-api</artifactId>
+      <version>2.5-20081211</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty</artifactId>
+      <version>6.1.14</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jetty-util</artifactId>
+      <version>6.1.14</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jsp-2.1</artifactId>
+      <version>6.1.14</version>
+    </dependency>
+    <dependency>
+      <groupId>org.mortbay.jetty</groupId>
+      <artifactId>jsp-api-2.1</artifactId>
+      <version>6.1.14</version>
+    </dependency>
+
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
index 8f83557..33a1e53 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -72,7 +72,7 @@ public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher
             }
           } catch(InterruptedException ie) {
             if (!stopped) {
-              LOG.warn("AsyncDispatcher thread interrupted", ie);
+              LOG.warn("AsyncDispatcher thread interrupted");
             }
             return;
           }
@@ -112,7 +112,7 @@ public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher
       try {
         eventHandlingThread.join();
       } catch (InterruptedException ie) {
-        LOG.warn("Interrupted Exception while stopping", ie);
+        LOG.warn("Interrupted Exception while stopping");
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 5359311..966df72 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -47,10 +47,15 @@ public class TajoContainerProxy extends ContainerProxy {
   @Override
   public void launch(ContainerLaunchContext containerLaunchContext) {
     context.getResourceAllocator().addContainer(containerID, this);
+
     this.hostName = container.getNodeId().getHost();
-    this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+    //this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+    this.port = ((TajoWorkerContainer)container).getWorkerResource().getPullServerPort();
     this.state = ContainerState.RUNNING;
 
+    LOG.info("=======>Launch Container:" + executionBlockId + "," + containerID.getId() + "," +
+        container.getId() + "," + container.getNodeId() + ", pullServer=" + port);
+
     assignExecutionBlock(executionBlockId, container);
 
     context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index e522913..8d1bbe0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -117,6 +117,9 @@ public class TajoMasterService extends AbstractService {
       if(command != null) {
         builder.setResponseCommand(command);
       }
+
+      builder.setNumClusterNodes(context.getResourceManager().getWorkers().size());
+      builder.setNumClusterSlots(context.getResourceManager().getNumClusterSlots());
       done.run(builder.build());
     }
 
@@ -126,17 +129,6 @@ public class TajoMasterService extends AbstractService {
         TajoMasterProtocol.WorkerResourceAllocationRequest request,
         RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
       context.getResourceManager().allocateWorkerResources(request, done);
-
-//      List<String> workerHosts = new ArrayList<String>();
-//      for(WorkerResource eachWorker: workerResources) {
-//        workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
-//      }
-//
-//      done.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
-//          .setExecutionBlockId(request.getExecutionBlockId())
-//          .addAllAllocatedWorks(workerHosts)
-//          .build()
-//      );
     }
 
     @Override
@@ -148,11 +140,11 @@ public class TajoMasterService extends AbstractService {
         WorkerResource workerResource = new WorkerResource();
         String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
         workerResource.setAllocatedHost(tokens[0]);
-        workerResource.setPorts(new int[]{Integer.parseInt(tokens[1])});
+        workerResource.setManagerPort(Integer.parseInt(tokens[1]));
         workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
         workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
 
-        LOG.info("====> releaseWorkerResource:" + workerResource);
+        LOG.info("releaseWorkerResource:" + workerResource);
         context.getResourceManager().releaseWorkerResource(
             new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
             workerResource);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 651f9c0..17b6168 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -396,7 +396,8 @@ public class TaskSchedulerImpl extends AbstractService
       TaskRequestEvent taskRequest;
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
-        LOG.info("====> assignToLeafTasks: " + taskRequest.getExecutionBlockId());
+        LOG.info("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+            "containerId=" + taskRequest.getContainerId());
         ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
         String host = container.getTaskHostName();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2e2870f..10be0a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -23,13 +23,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.master.rm.WorkerResourceManager;
@@ -45,7 +45,7 @@ public class QueryInProgress extends CompositeService {
 
   private QueryId queryId;
 
-  private AsyncDispatcher dispatcher;
+  private TajoAsyncDispatcher dispatcher;
 
   private LogicalRootNode plan;
 
@@ -75,7 +75,7 @@ public class QueryInProgress extends CompositeService {
 
   @Override
   public void init(Configuration conf) {
-    dispatcher = new AsyncDispatcher();
+    dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId);
     this.addService(dispatcher);
 
     dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
@@ -256,30 +256,4 @@ public class QueryInProgress extends CompositeService {
         state == TajoProtos.QueryState.QUERY_KILLED ||
         state == TajoProtos.QueryState.QUERY_SUCCEEDED;
   }
-
-//  private void checkQueryMasterShutdown() {
-//    //run background
-//    Thread t = new Thread() {
-//      public void run() {
-//        while(true) {
-//          try {
-//            if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
-//              queryMasterStopped.set(true);
-//              LOG.info("==========> " + queryId + " QueryMaster stopped");
-//              break;
-//            }
-//          } catch (Exception e) {
-//            LOG.error(e.getMessage(), e);
-//          }
-//          try {
-//            Thread.sleep(1000);
-//          } catch (InterruptedException e) {
-//            break;
-//          }
-//        }
-//      }
-//    };
-//
-//    t.start();
-//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index e7ceae7..89fc6fe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -66,14 +66,14 @@ public class QueryInfo {
     if(queryMasterResource == null) {
       return 0;
     }
-    return queryMasterResource.getPorts()[0];
+    return queryMasterResource.getManagerPort();
   }
 
   public int getQueryMasterClientPort() {
     if(queryMasterResource == null) {
       return 0;
     }
-    return queryMasterResource.getPorts()[1];
+    return queryMasterResource.getClientPort();
   }
 
   public TajoProtos.QueryState getQueryState() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index e4d83af..d5a4021 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -42,9 +42,9 @@ public class QueryJobManager extends CompositeService {
 
   private AsyncDispatcher dispatcher;
 
-  private Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
+  private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>();
 
-  private Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
+  private final Map<QueryId, QueryInProgress> finishedQueries = new HashMap<QueryId, QueryInProgress>();
 
   public QueryJobManager(final TajoMaster.MasterContext masterContext) {
     super(QueryJobManager.class.getName());
@@ -160,7 +160,9 @@ public class QueryJobManager extends CompositeService {
     if(queryHeartbeat.getTajoWorkerHost() != null) {
       WorkerResource queryMasterResource = new WorkerResource();
       queryMasterResource.setAllocatedHost(queryHeartbeat.getTajoWorkerHost());
-      queryMasterResource.setPorts(new int[]{queryHeartbeat.getTajoWorkerPort(), queryHeartbeat.getTajoWorkerClientPort()});
+      queryMasterResource.setManagerPort(queryHeartbeat.getTajoWorkerPort());
+      queryMasterResource.setClientPort(queryHeartbeat.getTajoWorkerClientPort());
+      queryMasterResource.setPullServerPort(queryHeartbeat.getTajoWorkerPullServerPort());
 
       queryInfo.setQueryMasterResource(queryMasterResource);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index fc60df7..ce4c846 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -620,14 +620,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit [] tasks = subQuery.getQueryUnits();
 
-      //TODO refresh worker's numClusterNodes
-      int numClusterNodes = subQuery.getContext().getResourceAllocator().getNumClusterNode();
-      int numRequest = numClusterNodes == 0 ? tasks.length: Math.min(tasks.length, numClusterNodes * 4);
+      int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
+          subQuery.getContext().getQueryMasterContext().getWorkerContext(), tasks.length
+      );
 
       final Resource resource = Records.newRecord(Resource.class);
 
       resource.setMemory(512);
 
+      LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
+
       Priority priority = Records.newRecord(Priority.class);
       priority.setPriority(subQuery.getPriority());
       ContainerAllocationEvent event =

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index be4b800..93450e1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -47,7 +47,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private Map<QueryId, WorkerResource> queryMasterMap = new HashMap<QueryId, WorkerResource>();
 
-  private Object workerResourceLock = new Object();
+  private final Object workerResourceLock = new Object();
 
   private final String queryIdSeed;
 
@@ -63,10 +63,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private int queryMasterMemoryMB;
 
+  private int queryMasterDiskSlot;
+
   public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
     this.masterContext = masterContext;
     this.queryIdSeed = String.valueOf(System.currentTimeMillis());
     this.queryMasterMemoryMB = masterContext.getConf().getInt("tajo.querymaster.memoryMB", 512);
+    this.queryMasterDiskSlot = masterContext.getConf().getInt("tajo.querymaster.diskSlot", 1);
 
     requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
     reAllocationList = new ArrayList<WorkerResourceRequest>();
@@ -78,6 +81,21 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     reAllocator.start();
   }
 
+  public Map<String, WorkerResource> getWorkers() {
+    return Collections.unmodifiableMap(allWorkerResourceMap);
+  }
+
+  public int getNumClusterSlots() {
+    int numSlots = 0;
+    synchronized(workerResourceLock) {
+      for(String eachWorker: liveWorkerResources) {
+        numSlots += allWorkerResourceMap.get(eachWorker).getSlots();
+      }
+    }
+
+    return numSlots;
+  }
+
   @Override
   public void stop() {
     if(stopped.get()) {
@@ -95,7 +113,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   @Override
   public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
-    List<WorkerResource> workerResources = chooseWorkers(true, 1, 1, 1);
+    List<WorkerResource> workerResources = chooseWorkers(true, queryMasterMemoryMB, queryMasterDiskSlot, 1);
     if(workerResources.size() == 0) {
       //TODO if resource available, assign worker.
       LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
@@ -188,16 +206,20 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
       while(!stopped.get()) {
         try {
           WorkerResourceRequest resourceRequest = requestQueue.take();
-          List<WorkerResource> workerResources = chooseWorkers(false,
-              resourceRequest.request.getMemoryMBSlots(),
-              resourceRequest.request.getDiskSlots(),
-              resourceRequest.request.getNumWorks());
 
           LOG.info("====> allocateWorkerResources:" +
               (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
-              ", required:" + resourceRequest.request.getNumWorks() + ", allocated:" + workerResources.size() +
+              ", required:" + resourceRequest.request.getNumWorks() +
               ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
               ", liveWorkers=" + liveWorkerResources.size());
+
+          List<WorkerResource> workerResources = chooseWorkers(false,
+              resourceRequest.request.getMemoryMBSlots(),
+              resourceRequest.request.getDiskSlots(),
+              resourceRequest.request.getNumWorks());
+
+          LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
+
 //          if(LOG.isDebugEnabled()) {
 //            LOG.debug("====> allocateWorkerResources:" +
 //                (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
@@ -211,14 +233,18 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
             if(resourceRequest.queryMasterRequest) {
               startQueryMaster(resourceRequest.queryId, workerResources.get(0));
             } else {
-              List<String> workerHosts = new ArrayList<String>();
+              List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts =
+                  new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
 
               for(WorkerResource eachWorker: workerResources) {
-                workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
+                workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+                    .setWorkerHostAndPort(eachWorker.getAllocatedHost() + ":" + eachWorker.getManagerPort())
+                    .setWorkerPullServerPort(eachWorker.getPullServerPort())
+                    .build());
               }
               resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
                   .setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
-                  .addAllAllocatedWorks(workerHosts)
+                  .addAllWorkerAllocatedResource(workerHosts)
                   .build()
               );
             }
@@ -282,27 +308,47 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private List<WorkerResource> chooseWorkers(boolean queryMaster,
                                              int requiredMemoryMBSlots, int requiredDiskSlots,
-                                             int numWorkers) {
+                                             int numWorkerSlots) {
     List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
 
     int selectedCount = 0;
 
     synchronized(workerResourceLock) {
-      for(String eachWorker: liveWorkerResources) {
-        if(selectedCount >= numWorkers) {
+      List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+      Collections.shuffle(randomWorkers);
+      int liveWorkerSize = randomWorkers.size();
+      Set<String> insufficientWorkers = new HashSet<String>();
+      boolean stop = false;
+      while(!stop) {
+        if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
           break;
         }
-        WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
-        if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots &&
-            workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
-          if(queryMaster && workerResource.isQueryMasterAllocated()) {
-            continue;
+        for(String eachWorker: randomWorkers) {
+          if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
+            stop = true;
+          } else {
+            WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+            if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
+                //TODO check disk slot
+                // && workerResource.getAvailableDiskSlots() >= requiredDiskSlots) {
+              if(queryMaster && workerResource.isQueryMasterAllocated()) {
+                insufficientWorkers.add(eachWorker);
+                continue;
+              }
+              workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
+              //workerResource.addUsedDiskSlots(requiredDiskSlots);
+              workerResource.setQueryMasterAllocated(queryMaster);
+              selectedWorkers.add(workerResource);
+              selectedCount++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
+          }
+        }
+        if(!stop) {
+          for(String eachWorker: insufficientWorkers) {
+            randomWorkers.remove(eachWorker);
           }
-          workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
-          workerResource.addUsedDiskSlots(requiredDiskSlots);
-          workerResource.setQueryMasterAllocated(queryMaster);
-          selectedWorkers.add(workerResource);
-          selectedCount++;
         }
       }
     }
@@ -310,10 +356,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     return selectedWorkers;
   }
 
-  public Collection<WorkerResource> getClusterWorkResources() {
-    return Collections.unmodifiableCollection(allWorkerResourceMap.values());
-  }
-
   @Override
   public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
     synchronized(workerResourceLock) {
@@ -341,14 +383,22 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   @Override
   public void stopQueryMaster(QueryId queryId) {
-    WorkerResource workerResource = null;
+    WorkerResource queryMasterWorkerResource = null;
     synchronized(workerResourceLock) {
-      workerResource = queryMasterMap.remove(queryId);
-    }
-    LOG.info("release QueryMaster resource:" + queryId + "," + workerResource.isQueryMasterAllocated());
-    if(workerResource != null) {
-      releaseWorkerResource(queryId, workerResource);
+      if(!queryMasterMap.containsKey(queryId)) {
+        LOG.warn("No QueryMaster resource info for " + queryId);
+        return;
+      } else {
+        queryMasterWorkerResource = queryMasterMap.remove(queryId);
+      }
     }
+    LOG.info("release QueryMaster resource:" + queryId + "," + queryMasterWorkerResource);
+    WorkerResource workerResource = new WorkerResource();
+    workerResource.copyId(queryMasterWorkerResource);
+    workerResource.setMemoryMBSlots(queryMasterMemoryMB);
+    workerResource.setDiskSlots(queryMasterDiskSlot);
+    workerResource.setCpuCoreSlots(0);
+    releaseWorkerResource(queryId, workerResource);
   }
 
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
@@ -368,7 +418,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
         int[] ports = new int[] { request.getTajoWorkerPort(), request.getTajoWorkerClientPort() };
 
-        workerResource.setPorts(ports);
+        workerResource.setManagerPort(request.getTajoWorkerPort());
+        workerResource.setClientPort(request.getTajoWorkerClientPort());
+        workerResource.setPullServerPort(request.getTajoWorkerPullServerPort());
 
         workerResource.setLastHeartbeat(System.currentTimeMillis());
         workerResource.setWorkerStatus(WorkerStatus.LIVE);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index 00bd509..b21570c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -25,7 +25,9 @@ public class WorkerResource {
   private static final Log LOG = LogFactory.getLog(WorkerResource.class);
 
   private String allocatedHost;
-  private int[] ports;
+  private int managerPort;
+  private int clientPort;
+  private int pullServerPort;
 
   private int diskSlots;
   private int cpuCoreSlots;
@@ -42,11 +44,12 @@ public class WorkerResource {
   private long lastHeartbeat;
 
   public String getId() {
-    if(ports.length > 0) {
-      return allocatedHost + ":" + ports[0];
-    } else {
-      return allocatedHost;
-    }
+    return allocatedHost + ":" + managerPort;
+  }
+
+  public void copyId(WorkerResource workerResource) {
+    managerPort = workerResource.getManagerPort();
+    allocatedHost = workerResource.getAllocatedHost();
   }
 
   public String getAllocatedHost() {
@@ -57,14 +60,6 @@ public class WorkerResource {
     this.allocatedHost = allocatedHost;
   }
 
-  public int[] getPorts() {
-    return ports;
-  }
-
-  public void setPorts(int[] ports) {
-    this.ports = ports;
-  }
-
   public void addUsedDiskSlots(int diskSlots) {
     usedDiskSlots += diskSlots;
   }
@@ -115,18 +110,8 @@ public class WorkerResource {
         ", used=" + usedMemoryMBSlots + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
   }
 
-  private String portsToStr() {
-    if(ports == null) {
-      return "null";
-    }
-    String result = "";
-    String prefix = "";
-    for(int i = 0; i < ports.length; i++) {
-      result += prefix + ports[i];
-      prefix = ",";
-    }
-
-    return result;
+  public String portsToStr() {
+    return managerPort + "," + clientPort + "," + pullServerPort;
   }
 
   public void setLastHeartbeat(long heartbeatTime) {
@@ -182,13 +167,51 @@ public class WorkerResource {
         queryMasterAllocated = false;
     }
 
-    usedMemoryMBSlots -= workerResource.memoryMBSlots;
-    usedDiskSlots -= workerResource.diskSlots;
+    usedMemoryMBSlots = usedMemoryMBSlots - workerResource.memoryMBSlots;
+    //usedDiskSlots = usedDiskSlots - workerResource.diskSlots;
 
     if(usedMemoryMBSlots < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
-//      LOG.warn("Used resources can't be a minus.");
-      LOG.trace("Used resources can't be a minus.");
+      LOG.warn("Used resources can't be a minus.");
       LOG.warn(this + " ==> " + workerResource);
     }
   }
+
+  public int getSlots() {
+    //TODO what is slot? 512MB = 1slot?
+    return memoryMBSlots/512;
+  }
+
+  public int getAvaliableSlots() {
+    //TODO what is slot? 512MB = 1slot?
+    return getAvailableMemoryMBSlots()/512;
+  }
+
+  public int getUsedSlots() {
+    //TODO what is slot? 512MB = 1slot?
+    return usedMemoryMBSlots/512;
+  }
+
+  public int getManagerPort() {
+    return managerPort;
+  }
+
+  public void setManagerPort(int managerPort) {
+    this.managerPort = managerPort;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+
+  public void setClientPort(int clientPort) {
+    this.clientPort = clientPort;
+  }
+
+  public int getPullServerPort() {
+    return pullServerPort;
+  }
+
+  public void setPullServerPort(int pullServerPort) {
+    this.pullServerPort = pullServerPort;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index a066321..93772ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 
 import java.io.IOException;
+import java.util.Map;
 
 public interface WorkerResourceManager {
 
@@ -56,5 +57,9 @@ public interface WorkerResourceManager {
 
   public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource);
 
+  public Map<String, WorkerResource> getWorkers();
+
   public void stop();
+
+  public int getNumClusterSlots();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
index d7adad8..b159983 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -76,8 +76,6 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
     RegisterApplicationMasterResponse response;
     try {
       response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
-      context.getResourceAllocator().setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
-      context.getResourceAllocator().setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
 
       // If the number of cluster nodes is ZERO, it waits for available nodes.
       AllocateResponse allocateResponse = allocate(0.0f);
@@ -90,7 +88,7 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
           LOG.error(e);
         }
       }
-      context.getResourceAllocator().setNumClusterNodes(allocateResponse.getNumClusterNodes());
+      context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
     } catch (YarnRemoteException e) {
       LOG.error(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 8756eed..9bb153c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -75,6 +75,14 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
   public void stop() {
   }
 
+  public Map<String, WorkerResource> getWorkers() {
+    return new HashMap<String, WorkerResource>();
+  }
+
+  public int getNumClusterSlots() {
+    return 0;
+  }
+
   @Override
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
     //nothing to do

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index ebc3f08..c75fd96 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -26,10 +26,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
-  private int minCapability;
-  private int maxCapability;
-  private int numCluster;
-
   private Map<ContainerId, ContainerProxy> containers = new HashMap<ContainerId, ContainerProxy>();
 
   public AbstractResourceAllocator() {
@@ -55,25 +51,4 @@ public abstract class AbstractResourceAllocator extends CompositeService impleme
   public Map<ContainerId, ContainerProxy> getContainers() {
     return containers;
   }
-
-  public void setMaxContainerCapability(int capability) {
-    this.maxCapability = capability;
-  }
-
-  public int getMaxContainerCapability() {
-    return this.maxCapability;
-  }
-
-  public void setMinContainerCapability(int capability) {
-    this.minCapability = capability;
-  }
-
-  public int getNumClusterNode() {
-    return numCluster;
-  }
-
-  public void setNumClusterNodes(int num) {
-    numCluster = num;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index 108c7b7..f0c70cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -24,4 +24,5 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 public interface ResourceAllocator {
   public void allocateTaskWorker();
   public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 3264f55..e6ec9c0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -92,7 +92,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   @Override
   public void allocateTaskWorker() {
-    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+    int clusterSlots = workerContext.getNumClusterSlots();
+    return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
   }
 
   @Override
@@ -283,14 +288,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           continue;
         }
 
-        List<String> workerHosts = response.getAllocatedWorksList();
+        List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
         List<Container> containers = new ArrayList<Container>();
-        for(String eachWorker: workerHosts) {
+        for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeIdPBImpl nodeId = new NodeIdPBImpl();
-          String[] tokens = eachWorker.split(":");
+          String[] tokens = eachWorker.getWorkerHostAndPort().split(":");
 
           nodeId.setHost(tokens[0]);
           nodeId.setPort(Integer.parseInt(tokens[1]));
@@ -306,7 +311,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
           WorkerResource workerResource = new WorkerResource();
           workerResource.setAllocatedHost(nodeId.getHost());
-          workerResource.setPorts(new int[]{nodeId.getPort()});
+          workerResource.setManagerPort(nodeId.getPort());
+          workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
           workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
           workerResource.setDiskSlots(requiredDiskSlots);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 3b116f2..bb27ee4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -33,7 +33,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryMaster;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.CallFuture2;
 import org.apache.tajo.rpc.ProtoAsyncRpcClient;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.TajoIdUtils;
@@ -45,7 +45,10 @@ import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TajoWorker extends CompositeService {
   public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
@@ -78,6 +81,10 @@ public class TajoWorker extends CompositeService {
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
+  private AtomicInteger numClusterNodes = new AtomicInteger();
+
+  private AtomicInteger numClusterSlots = new AtomicInteger();
+
   public TajoWorker(String daemonMode) throws Exception {
     super(TajoWorker.class.getName());
     this.daemonMode = daemonMode;
@@ -214,6 +221,22 @@ public class TajoWorker extends CompositeService {
     public boolean isStandbyMode() {
       return !"qm".equals(daemonMode) && !"tr".equals(daemonMode);
     }
+
+    public void setNumClusterNodes(int numClusterNodes) {
+      TajoWorker.this.numClusterNodes.set(numClusterNodes);
+    }
+
+    public int getNumClusterNodes() {
+      return TajoWorker.this.numClusterNodes.get();
+    }
+
+    public void setNumClusterSlots(int numClusterSlots) {
+      TajoWorker.this.numClusterSlots.set(numClusterSlots);
+    }
+
+    public int getNumClusterSlots() {
+      return TajoWorker.this.numClusterSlots.get();
+    }
   }
 
   public void stopWorkerForce() {
@@ -225,14 +248,7 @@ public class TajoWorker extends CompositeService {
       //QueryMaster mode
       String tajoMasterAddress = params[2];
 
-      LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
-      InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
-      try {
-        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
-        tajoMasterRpcClient = tajoMasterRpc.getStub();
-      } catch (Exception e) {
-        LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
-      }
+      connectToTajoMaster(tajoMasterAddress);
 
       QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
       tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
@@ -242,17 +258,28 @@ public class TajoWorker extends CompositeService {
       taskRunnerManager.startTask(params);
     } else {
       //Standby mode
-      String tajoMasterAddress = tajoConf.get("tajo.master.manager.addr");
-      LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
-      InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+      connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
+      workerHeartbeatThread = new WorkerHeartbeatThread();
+      workerHeartbeatThread.start();
+    }
+  }
+
+  private void connectToTajoMaster(String tajoMasterAddress) {
+    LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
+    InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+    while(true) {
       try {
         tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
         tajoMasterRpcClient = tajoMasterRpc.getStub();
+        break;
       } catch (Exception e) {
         LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
       }
-      workerHeartbeatThread = new WorkerHeartbeatThread();
-      workerHeartbeatThread.start();
+
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+      }
     }
   }
 
@@ -302,8 +329,15 @@ public class TajoWorker extends CompositeService {
     }
 
     public void run() {
+      CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+          new CallFuture2<TajoMasterProtocol.TajoHeartbeatResponse>();
       LOG.info("Worker Resource Heartbeat Thread start.");
       int sendDiskInfoCount = 0;
+      int pullServerPort = 0;
+      if(pullService != null) {
+        pullServerPort = pullService.getPort();
+      }
+
       while(true) {
         if(sendDiskInfoCount == 0 && mountPaths != null) {
           for(File eachFile: mountPaths) {
@@ -323,14 +357,33 @@ public class TajoWorker extends CompositeService {
             .setDiskSlots(workerDiskSlots)
             .build();
 
+
         TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
             .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
             .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
             .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+            .setTajoWorkerPullServerPort(pullServerPort)
             .setServerStatus(serverStatus)
             .build();
 
-        workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, NullCallback.get());
+        workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, callBack);
+
+        try {
+          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+          if(response != null) {
+            if(response.getNumClusterNodes() > 0) {
+              workerContext.setNumClusterNodes(response.getNumClusterNodes());
+            }
+
+            if(response.getNumClusterSlots() > 0) {
+              workerContext.setNumClusterSlots(response.getNumClusterSlots());
+            }
+          }
+        } catch (InterruptedException e) {
+          break;
+        } catch (TimeoutException e) {
+        }
+
         try {
           Thread.sleep(10 * 1000);
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 47ec7bc..8493be2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -173,6 +173,10 @@ public class TaskRunner extends AbstractService {
     }
   }
 
+  public String getId() {
+    return executionBlockId + "," + containerId;
+  }
+
   @Override
   public void init(Configuration conf) {
     this.queryConf = (QueryConf)conf;
@@ -313,7 +317,7 @@ public class TaskRunner extends AbstractService {
             try {
               if (callFuture == null) {
                 callFuture = new CallFuture2<QueryUnitRequestProto>();
-                LOG.info("====>Request GetTask:" + executionBlockId + "," + containerId);
+                LOG.info("====>Request GetTask:" + getId());
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                     .setExecutionBlockId(executionBlockId.getProto())
                     .setContainerId(((ContainerIdPBImpl) containerId).getProto())
@@ -333,7 +337,7 @@ public class TaskRunner extends AbstractService {
                 }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
-                LOG.warn("Timeout getResource:" + executionBlockId + ", but retry", te);
+                LOG.warn("Timeout getResource:" + getId() + ", but retry", te);
                 continue;
               }
 
@@ -342,11 +346,11 @@ public class TaskRunner extends AbstractService {
                 // If TaskRunner receives the terminal signal, TaskRunner will be terminated
                 // immediately.
                 if (taskRequest.getShouldDie()) {
-                  LOG.info("Received ShouldDie flag:" + executionBlockId);
+                  LOG.info("Received ShouldDie flag:" + getId());
                   stop();
                   if(taskRunnerManager != null) {
                     //notify to TaskRunnerManager
-                    taskRunnerManager.stopTask(executionBlockId);
+                    taskRunnerManager.stopTask(getId());
                   }
                 } else {
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index dcd44df..5d8fa8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryConf;
 import org.apache.tajo.conf.TajoConf;
 
@@ -33,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class TaskRunnerManager extends CompositeService {
   private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
 
-  private Map<ExecutionBlockId, TaskRunner> taskRunnerMap = new HashMap<ExecutionBlockId, TaskRunner>();
+  private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
   private TajoWorker.WorkerContext workerContext;
   private TajoConf tajoConf;
   private AtomicBoolean stop = new AtomicBoolean(false);
@@ -74,10 +73,10 @@ public class TaskRunnerManager extends CompositeService {
     }
   }
 
-  public void stopTask(ExecutionBlockId executionBlockId) {
-    LOG.info("Stop Task:" + executionBlockId);
+  public void stopTask(String id) {
+    LOG.info("Stop Task:" + id);
     synchronized(taskRunnerMap) {
-      taskRunnerMap.remove(executionBlockId);
+      taskRunnerMap.remove(id);
     }
     if(!workerContext.isStandbyMode()) {
       stop();
@@ -91,8 +90,9 @@ public class TaskRunnerManager extends CompositeService {
         try {
           QueryConf queryConf = new QueryConf(tajoConf);
           TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+          LOG.info("Start TaskRunner:" + taskRunner.getId());
           synchronized(taskRunnerMap) {
-            taskRunnerMap.put(taskRunner.getContext().getExecutionBlockId(), taskRunner);
+            taskRunnerMap.put(taskRunner.getId(), taskRunner);
           }
           taskRunner.init(queryConf);
           taskRunner.start();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index 9470a88..d47cc81 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -61,7 +61,12 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
 
   @Override
   public void allocateTaskWorker() {
-    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+    int numClusterNodes = workerContext.getNumClusterNodes();
+    return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * 4);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index 0153c8d..04abfa3 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -56,6 +56,7 @@ message TajoHeartbeat {
   optional QueryIdProto queryId = 5;
   optional QueryState state = 6;
   optional string statusMessage = 7;
+  optional int32 tajoWorkerPullServerPort = 8;
 }
 
 message TajoHeartbeatResponse {
@@ -64,7 +65,9 @@ message TajoHeartbeatResponse {
       repeated string params = 2;
   }
   required BoolProto heartbeatResult = 1;
-  optional ResponseCommand responseCommand = 3;
+  required int32 numClusterNodes = 2;
+  required int32 numClusterSlots = 3;
+  optional ResponseCommand responseCommand = 4;
 }
 
 message WorkerResourceAllocationRequest {
@@ -85,9 +88,14 @@ message WorkerResourceReleaseRequest {
     repeated WorkerResourceProto workerResources = 1;
 }
 
+message WorkerAllocatedResource {
+    required string workerHostAndPort = 1;
+    required int32 workerPullServerPort = 2;
+}
+
 message WorkerResourceAllocationResponse {
     required ExecutionBlockIdProto executionBlockId = 1;
-    repeated string allocatedWorks = 2;
+    repeated WorkerAllocatedResource workerAllocatedResource = 2;
 }
 
 service TajoMasterProtocolService {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ce84eba4/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index 3e8af74..7cedfd4 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -1,169 +1,57 @@
-<%@ page language="java" contentType="text/html; charset=UTF-8"
-    pageEncoding="UTF-8"%>
-<%--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--- %>
+<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
 
 <%@ page import="java.util.*" %>
-<%@ page import="tajo.webapp.StaticHttpServer" %>
-<%@ page import="nta.catalog.*" %>
-<%@ page import="nta.engine.*" %>
-<%@ page import="nta.engine.cluster.ClusterManager" %>
 <%@ page import="java.net.InetSocketAddress" %>
 <%@ page import="java.net.InetAddress"  %>
 <%@ page import="org.apache.hadoop.conf.Configuration" %>
-<%@ page import="nta.engine.NConstants" %>
+<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
+<%@ page import="org.apache.tajo.master.*" %>
+<%@ page import="org.apache.tajo.master.rm.*" %>
+<%@ page import="org.apache.tajo.catalog.*" %>
+
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
 <html>
-  <head>
-    <link rel="stylesheet" type = "text/css" href = "./style.css" />
-    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
-    <title>tajo main</title>
-    <%
-     NtaEngineMaster master = (NtaEngineMaster)application.getAttribute("tajo.master");
-     CatalogService catalog = master.getCatalog(); 
-     HashMap<String,String> map = (HashMap<String,String>)application.getAttribute("tajo.online.worker");
-     List<String> serverList = master.getOnlineServer();
-     HashMap<String,String> curMap = new HashMap<String, String>();
-     if(map == null) {
-       map = new HashMap<String,String>();
-       for(int i = 0 ; i < serverList.size() ; i ++) {
-    	   String tmp = serverList.get(i);
-    	   map.put(tmp,tmp);
-    	   curMap.put(tmp, tmp);
-       }
-       application.setAttribute("tajo.online.worker", map);
-     }else {
-       for(int i = 0 ; i < serverList.size() ; i ++) {
-    	   String tmp = serverList.get(i);
-    	   if(!map.containsKey(tmp)) {
-    		   map.put(tmp, tmp);
-    	   }
-    	   curMap.put(tmp, tmp);
-       }
-     }
-     String masterAddr = (String)application.getAttribute("tajo.master.addr");
-     ClusterManager cm = master.getClusterManager();
-     String[] tableArr = catalog.getAllTableNames().toArray(new String[0]);
-     
-     long totalDisk = 0;
-     long availableDisk = 0;
-     final double MB = Math.pow(1024, 4);
-     for(int i = 0 ; i < serverList.size() ; i ++) {
-      ClusterManager.WorkerInfo info = cm.getWorkerInfo(serverList.get(i));
-      List<ClusterManager.DiskInfo> disks = info.disks;
-      for(int j = 0 ; j < disks.size() ; j ++) {
-    	  totalDisk += disks.get(j).totalSpace;
-    	  availableDisk += disks.get(j).freeSpace;
-      }
-     }
-     
-     long time = (System.currentTimeMillis() - (Long)application.getAttribute("tajo.master.starttime"))/1000;
-     
-    %>
-  </head>
-  <body>
-    <div class = "container">
-      <div>
-      <img src="./img/tajochar_title_small.jpg" />
-      </div>
-    </div>
-    <br />
-    <div class = "headline_2">
-     <div class = "container">
-      <a href="./catalogview.jsp" class="headline">Catalog</a>
-      &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
-      <a href="./nodeview.jsp" class="headline">Workers</a>
-      &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
-      <a href="./queryview.jsp" class="headline">Queries</a>
-     </div>
-    </div>
-    
-    <div class ="container">
-     <div class = "outbox">
-      <h2 class = "compactline">System Summary</h2>
-      <table align = "center"class = "noborder">
-       <tr>
-        <th class="rightbottom">Cluster Number</th>
-        <th class="rightbottom">Live workers</th>
-        <th class="rightbottom">Table number</th>
-        <th class="rightbottom">Total Disk Size</th>
-        <th class="rightbottom">Available Disk Size</th>
-        <th class="bottom">Running Time</th>
-       </tr>
-       <tr>
-        <td class="rightborder"><%=map.size()%></td>
-        <td class="rightborder"><%=curMap.size()%></td>
-        <td class="rightborder"><%=tableArr.length%></td>
-        <td class="rightborder"><%=String.format("%.2f", totalDisk/MB)%>TB</td>
-        <td class="rightborder"><%=String.format("%.2f", availableDisk/MB)%>TB</td>
-        <td class="noborder"><%=time/3600/24 + "d" + time/3600 + "h" + time/60%60 + "m" + time%60+"s"%></td>
-       </tr>
-      </table>
-     </div>
-    
-     <div class="outbox_order">
-      <h2 class="compactline">Table List</h2>     
-      <table align = "center" class = "new">
-      <tr>
-       <th style="width:110px">TableName</th>
-       <th>TablePath</th>
-      </tr>
-      <%
-      for(int i = 0 ; i < tableArr.length ; i ++ ) {
-        TableDesc table = catalog.getTableDesc(tableArr[i]);    
-      %>
-      <tr>
-        <td><a href = "./catalogview.jsp?tablename=<%=table.getId()%>" class = "tablelink"><%=table.getId()%></a></td>
-        <td><%=table.getPath()%></td>
-      </tr>	
-      <% 	
-      }
-      %>
-      </table>
-     </div>
-     <div style="float:left; width:6px">&nbsp;</div>
-     <div class="outbox_order">
-      <h2 class="compactline">Worker List</h2>
-      <table align="center" class = "new">
-      <tr>
-        <th style="width:90px">Status</th>
-        <th>Worker Name</th>
-      </tr>
-      <%
-      Set<String> keySet = map.keySet();
-      for(String key : keySet ) {
-    	out.write("<tr>");  
-     	if(!curMap.containsKey(key)) {
-     	  out.write("<td><img src=\"./img/off.jpg\" />&nbsp;&nbsp;  Offline</td>");
-     	  out.write("<td><a href =\"http://" +key.split(":")[0] + ":8080/nodedetail.jsp?workername="+key+"\" class = \"tablelink\">" + key + "</a></td>");
-     	}
-     	else{
-     	  out.write("<td><img src=\"./img/on.jpg\" />&nbsp;&nbsp;  Online</td>");
-       	  out.write("<td><a href =\"http://" +key.split(":")[0] + ":8080/nodedetail.jsp?workername="+key+"\" class = \"tablelink\">" + key + "</a></td>");
-       	}
-     	out.write("</tr>");
-       }
-     %>
-     </table> 
-    </div>
-    
-    <div style="clear:both"></div>
-    
-   
-  </body>
+<head>
+  <link rel="stylesheet" type = "text/css" href = "./style.css" />
+  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+  <title>tajo main</title>
+  <%
+    TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
+    CatalogService catalog = master.getCatalog();
+    Map<String, WorkerResource> workers = master.getContext().getResourceManager().getWorkers();
+  %>
+</head>
+<body>
+<img src='img/tajo_logo.png'/>
+<hr/>
+<table>
+  <tr><th>Worker</th><th>Ports</th><th>Slot</th></th><th>Memory(Used/Capacity)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+  <%
+    List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+    Collections.sort(wokerKeys);
+    for(String eachWorker: wokerKeys) {
+      WorkerResource worker = workers.get(eachWorker);
+  %>
+  <tr>
+    <td><%=eachWorker%></td>
+    <td><%=worker.portsToStr()%></td>
+    <td><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
+    <td><%=worker.getUsedMemoryMBSlots()%>/<%=worker.getMemoryMBSlots()%> MB</td>
+    <td><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+    <td><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+    <td><%=worker.getWorkerStatus()%></td>
+  </tr>
+  <%
+    }
+
+    if(workers.isEmpty()) {
+  %>
+  <tr>
+    <td colspan='7'>No Workers</td>
+  </tr>
+  <%
+    }
+  %>
+</table>
+</body>
 </html>