You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/02 09:16:40 UTC

[2/2] git commit: TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)

TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/528c914f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/528c914f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/528c914f

Branch: refs/heads/master
Commit: 528c914f9a133bef79df07017cfa424c9fab4412
Parents: 778c01f
Author: Jihoon Son <ji...@apache.org>
Authored: Mon Dec 2 17:16:01 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Mon Dec 2 17:16:01 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |  14 +-
 .../apache/tajo/master/TajoContainerProxy.java  |  38 +-
 .../apache/tajo/master/TajoMasterService.java   |  25 +-
 .../tajo/master/querymaster/SubQuery.java       |   9 +-
 .../tajo/master/rm/TajoWorkerContainerId.java   |  41 ++
 .../master/rm/TajoWorkerResourceManager.java    | 358 +++++++++++++----
 .../apache/tajo/master/rm/WorkerResource.java   | 115 +++---
 .../tajo/master/rm/WorkerResourceManager.java   |   6 +-
 .../tajo/master/rm/YarnTajoResourceManager.java |  28 +-
 .../apache/tajo/util/ApplicationIdUtils.java    |   3 +
 .../apache/tajo/worker/ResourceAllocator.java   |   3 +-
 .../tajo/worker/TajoResourceAllocator.java      |  68 ++--
 .../java/org/apache/tajo/worker/TajoWorker.java |  86 ++--
 .../main/java/org/apache/tajo/worker/Task.java  |   6 +-
 .../tajo/worker/YarnResourceAllocator.java      |   4 +-
 .../src/main/proto/TajoMasterProtocol.proto     |  64 ++-
 .../main/resources/webapps/admin/cluster.jsp    |  31 +-
 .../src/main/resources/webapps/admin/index.jsp  |  22 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   3 +
 .../tajo/master/TestTajoResourceManager.java    | 390 +++++++++++++++++++
 21 files changed, 1013 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index caadc0c..1646d4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-317: Improve TajoResourceManager to support more elaborate resource management. (Keuntae Park via jihoon)
+
     TAJO-314: Make TaskScheduler be pluggable. (jihoon)
 
     TAJO-325: QueryState.NEW and QueryState.INIT should be combined into one

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index eba6eaf..f89838f 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -81,6 +81,10 @@ public class TajoConf extends YarnConfiguration {
     TAJO_MASTER_CLIENT_RPC_ADDRESS("tajo.master.client-rpc.address", "localhost:26002"),
     TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080"),
 
+    // QueryMaster resource
+    TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f),
+    TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512),
+
     // Tajo Worker Service Addresses
     WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080"),
     WORKER_QM_INFO_ADDRESS("tajo.worker.qm-info-http.address", "0.0.0.0:28081"),
@@ -93,8 +97,8 @@ public class TajoConf extends YarnConfiguration {
 
     // Tajo Worker Resources
     WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1),
-    WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024),
-    WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1),
+    WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 512),
+    WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f),
     WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2),
 
     // Tajo Worker Dedicated Resources
@@ -186,6 +190,12 @@ public class TajoConf extends YarnConfiguration {
     // Hive Configuration
     //////////////////////////////////
     HIVE_QUERY_MODE("tajo.hive.query.mode", false),
+
+    //////////////////////////////////
+    // Task Configuration
+    TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512),
+    TASK_DEFAULT_DISK("tajo.task.disk-slot.default", 1.0f)
+    //////////////////////////////////
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index d542390..ce5f401 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -21,7 +21,9 @@ package org.apache.tajo.master;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -29,7 +31,7 @@ import org.apache.tajo.master.event.QueryEvent;
 import org.apache.tajo.master.event.QueryEventType;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
 import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
@@ -121,7 +123,8 @@ public class TajoContainerProxy extends ContainerProxy {
       this.state = ContainerState.KILLED_BEFORE_LAUNCH;
     } else {
       try {
-        releaseWorkerResource(context, executionBlockId, ((TajoWorkerContainer)container).getWorkerResource());
+        TajoWorkerContainer tajoWorkerContainer = ((TajoWorkerContainer)container);
+        releaseWorkerResource(context, executionBlockId, tajoWorkerContainer.getId());
         context.getResourceAllocator().removeContainer(containerID);
         this.state = ContainerState.DONE;
       } catch (Throwable t) {
@@ -138,29 +141,21 @@ public class TajoContainerProxy extends ContainerProxy {
 
   public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
                                            ExecutionBlockId executionBlockId,
-                                           WorkerResource workerResource) throws Exception {
-    List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
-    workerResources.add(workerResource);
+                                           ContainerId containerId) throws Exception {
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
 
-    releaseWorkerResource(context, executionBlockId, workerResources);
+    releaseWorkerResource(context, executionBlockId, containerIds);
   }
 
   public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
                                            ExecutionBlockId executionBlockId,
-                                           List<WorkerResource> workerResources) throws Exception {
-    List<TajoMasterProtocol.WorkerResourceProto> workerResourceProtos =
-        new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
-
-    for(WorkerResource eahWorkerResource: workerResources) {
-      workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
-          .setHost(eahWorkerResource.getAllocatedHost())
-          .setQueryMasterPort(eahWorkerResource.getQueryMasterPort())
-          .setPeerRpcPort(eahWorkerResource.getPeerRpcPort())
-          .setExecutionBlockId(executionBlockId.getProto())
-          .setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
-          .setDiskSlots(eahWorkerResource.getDiskSlots())
-          .build()
-      );
+                                           List<ContainerId> containerIds) throws Exception {
+    List<YarnProtos.ContainerIdProto> containerIdProtos =
+        new ArrayList<YarnProtos.ContainerIdProto>();
+
+    for(ContainerId eachContainerId: containerIds) {
+      containerIdProtos.add(TajoWorkerContainerId.getContainerIdProto(eachContainerId));
     }
 
     RpcConnectionPool connPool = RpcConnectionPool.getPool(context.getConf());
@@ -171,7 +166,8 @@ public class TajoContainerProxy extends ContainerProxy {
         TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
           TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
-              .addAllWorkerResources(workerResourceProtos)
+              .setExecutionBlockId(executionBlockId.getProto())
+              .addAllContainerIds(containerIdProtos)
               .build(),
           NullCallback.get());
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index cf193de..c213dd5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -22,13 +22,14 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryJobManager;
-import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
@@ -118,8 +119,7 @@ public class TajoMasterService extends AbstractService {
         builder.setResponseCommand(command);
       }
 
-      builder.setNumClusterNodes(context.getResourceManager().getWorkers().size());
-      builder.setNumClusterSlots(context.getResourceManager().getNumClusterSlots());
+      builder.setClusterResourceSummary(context.getResourceManager().getClusterResourceSummary());
       done.run(builder.build());
     }
 
@@ -135,20 +135,11 @@ public class TajoMasterService extends AbstractService {
     public void releaseWorkerResource(RpcController controller,
                                            TajoMasterProtocol.WorkerResourceReleaseRequest request,
                                            RpcCallback<PrimitiveProtos.BoolProto> done) {
-      List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
-      for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
-        WorkerResource workerResource = new WorkerResource();
-        workerResource.setAllocatedHost(eachWorkerResource.getHost());
-
-        workerResource.setPeerRpcPort(eachWorkerResource.getPeerRpcPort());
-        workerResource.setQueryMasterPort(eachWorkerResource.getQueryMasterPort());
-        workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
-        workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
-
-        LOG.info("releaseWorkerResource:" + workerResource);
-        context.getResourceManager().releaseWorkerResource(
-            new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
-            workerResource);
+      List<YarnProtos.ContainerIdProto> containerIds = request.getContainerIdsList();
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+
+      for(YarnProtos.ContainerIdProto eachContainer: containerIds) {
+        context.getResourceManager().releaseWorkerResource(ebId, eachContainer);
       }
       done.run(BOOL_TRUE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 4aa3866..70bde5c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -611,13 +611,18 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       ExecutionBlock execBlock = subQuery.getBlock();
       QueryUnit [] tasks = subQuery.getQueryUnits();
 
+      //TODO consider disk slot
+      int requiredMemoryMBPerTask = 512;
+
       int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers(
-          subQuery.getContext().getQueryMasterContext().getWorkerContext(), tasks.length
+          subQuery.getContext().getQueryMasterContext().getWorkerContext(),
+          tasks.length,
+          requiredMemoryMBPerTask
       );
 
       final Resource resource = Records.newRecord(Resource.class);
 
-      resource.setMemory(2000);
+      resource.setMemory(requiredMemoryMBPerTask);
 
       LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
index f104637..2d7c1c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.rm;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 
 public class TajoWorkerContainerId extends ContainerId {
   ApplicationAttemptId applicationAttemptId;
@@ -44,4 +45,44 @@ public class TajoWorkerContainerId extends ContainerId {
   public void setId(int id) {
     this.id = id;
   }
+
+  public YarnProtos.ContainerIdProto getProto() {
+    YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+        .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp())
+        .setId(applicationAttemptId.getApplicationId().getId())
+        .build();
+
+    YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+        .setAttemptId(applicationAttemptId.getAttemptId())
+        .setApplicationId(appIdProto)
+        .build();
+
+    return YarnProtos.ContainerIdProto.newBuilder()
+        .setAppAttemptId(attemptIdProto)
+        .setAppId(appIdProto)
+        .setId(id)
+        .build();
+  }
+
+  public static YarnProtos.ContainerIdProto getContainerIdProto(ContainerId containerId) {
+    if(containerId instanceof TajoWorkerContainerId) {
+      return ((TajoWorkerContainerId)containerId).getProto();
+    } else {
+      YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder()
+          .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp())
+          .setId(containerId.getApplicationAttemptId().getApplicationId().getId())
+          .build();
+
+      YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder()
+          .setAttemptId(containerId.getApplicationAttemptId().getAttemptId())
+          .setApplicationId(appIdProto)
+          .build();
+
+      return YarnProtos.ContainerIdProto.newBuilder()
+          .setAppAttemptId(attemptIdProto)
+          .setAppId(appIdProto)
+          .setId(containerId.getId())
+          .build();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 7ffc563..1485ffe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -22,6 +22,8 @@ import com.google.protobuf.RpcCallback;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
@@ -30,17 +32,20 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryJobEvent;
-import org.apache.tajo.worker.TajoWorker;
+import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TajoWorkerResourceManager implements WorkerResourceManager {
   private static final Log LOG = LogFactory.getLog(TajoWorkerResourceManager.class);
 
+  static AtomicInteger containerIdSeq = new AtomicInteger(0);
+
   private TajoMaster.MasterContext masterContext;
 
   //all workers(include querymaster)
@@ -59,22 +64,46 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   private final Object workerResourceLock = new Object();
 
-  private final String queryIdSeed;
+  private String queryIdSeed;
 
   private WorkerResourceAllocationThread workerResourceAllocator;
 
   private WorkerMonitorThread workerMonitor;
 
-  private final BlockingQueue<WorkerResourceRequest> requestQueue;
+  private BlockingQueue<WorkerResourceRequest> requestQueue;
 
-  private final List<WorkerResourceRequest> reAllocationList;
+  private List<WorkerResourceRequest> reAllocationList;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
+  private float queryMasterDefaultDiskSlot;
+
+  private int queryMasterDefaultMemoryMB;
+
+  private TajoConf tajoConf;
+
+  private Map<YarnProtos.ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap =
+      new HashMap<YarnProtos.ContainerIdProto, AllocatedWorkerResource>();
+
   public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
     this.masterContext = masterContext;
+    init(masterContext.getConf());
+  }
+
+  public TajoWorkerResourceManager(TajoConf tajoConf) {
+    init(tajoConf);
+  }
+
+  private void init(TajoConf tajoConf) {
+    this.tajoConf = tajoConf;
     this.queryIdSeed = String.valueOf(System.currentTimeMillis());
 
+    this.queryMasterDefaultDiskSlot =
+        tajoConf.getFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+
+    this.queryMasterDefaultMemoryMB =
+        tajoConf.getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
+
     requestQueue = new LinkedBlockingDeque<WorkerResourceRequest>();
     reAllocationList = new ArrayList<WorkerResourceRequest>();
 
@@ -93,15 +122,41 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     return Collections.unmodifiableSet(liveQueryMasterWorkerResources);
   }
 
-  public int getNumClusterSlots() {
-    int numSlots = 0;
+  @Override
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    int totalDiskSlots = 0;
+    int totalCpuCoreSlots = 0;
+    int totalMemoryMB = 0;
+
+    int totalAvailableDiskSlots = 0;
+    int totalAvailableCpuCoreSlots = 0;
+    int totalAvailableMemoryMB = 0;
+
     synchronized(workerResourceLock) {
       for(String eachWorker: liveWorkerResources) {
-        numSlots += allWorkerResourceMap.get(eachWorker).getSlots();
+        WorkerResource worker = allWorkerResourceMap.get(eachWorker);
+        if(worker != null) {
+          totalMemoryMB += worker.getMemoryMB();
+          totalAvailableMemoryMB += worker.getAvailableMemoryMB();
+
+          totalDiskSlots += worker.getDiskSlots();
+          totalAvailableDiskSlots += worker.getAvailableDiskSlots();
+
+          totalCpuCoreSlots += worker.getCpuCoreSlots();
+          totalAvailableCpuCoreSlots += worker.getAvailableCpuCoreSlots();
+        }
       }
     }
 
-    return numSlots;
+    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+            .setNumWorkers(liveWorkerResources.size())
+            .setTotalCpuCoreSlots(totalCpuCoreSlots)
+            .setTotalDiskSlots(totalDiskSlots)
+            .setTotalMemoryMB(totalMemoryMB)
+            .setTotalAvailableCpuCoreSlots(totalAvailableCpuCoreSlots)
+            .setTotalAvailableDiskSlots(totalAvailableDiskSlots)
+            .setTotalAvailableMemoryMB(totalAvailableMemoryMB)
+            .build();
   }
 
   @Override
@@ -120,9 +175,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
   @Override
   public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
+    return allocateQueryMaster(queryInProgress.getQueryId());
+  }
+
+  public WorkerResource allocateQueryMaster(QueryId queryId) {
     synchronized(workerResourceLock) {
       if(liveQueryMasterWorkerResources.size() == 0) {
-        LOG.warn("No available resource for querymaster:" + queryInProgress.getQueryId());
+        LOG.warn("No available resource for querymaster:" + queryId);
         return null;
       }
       WorkerResource queryMasterWorker = null;
@@ -137,9 +196,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
       if(queryMasterWorker == null) {
         return null;
       }
-      queryMasterWorker.addNumQueryMasterTask();
-      queryMasterMap.put(queryInProgress.getQueryId(), queryMasterWorker);
-      LOG.info(queryInProgress.getQueryId() + "'s QueryMaster is " + queryMasterWorker);
+      queryMasterWorker.addNumQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
+      queryMasterMap.put(queryId, queryMasterWorker);
+      LOG.info(queryId + "'s QueryMaster is " + queryMasterWorker);
       return queryMasterWorker;
     }
   }
@@ -152,15 +211,23 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
 
     if(queryMasterWorkerResource != null) {
-      startQueryMaster(queryInProgress.getQueryId(), queryMasterWorkerResource);
+      AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+      allocatedWorkerResource.workerResource = queryMasterWorkerResource;
+      allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+      allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+
+      startQueryMaster(queryInProgress.getQueryId(), allocatedWorkerResource);
     } else {
       //add queue
       TajoMasterProtocol.WorkerResourceAllocationRequest request =
           TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
-            .setMemoryMBSlots(1)
-            .setDiskSlots(1)
             .setExecutionBlockId(QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0).getProto())
-            .setNumWorks(1)
+            .setNumContainers(1)
+            .setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+            .setMaxMemoryMBPerContainer(queryMasterDefaultMemoryMB)
+            .setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+            .setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot)
+            .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
             .build();
       try {
         requestQueue.put(new WorkerResourceRequest(queryInProgress.getQueryId(), true, request, null));
@@ -169,13 +236,13 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
-  private void startQueryMaster(QueryId queryId, WorkerResource workResource) {
+  private void startQueryMaster(QueryId queryId, AllocatedWorkerResource workResource) {
     QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
     if(queryInProgress == null) {
       LOG.warn("No QueryInProgress while starting  QueryMaster:" + queryId);
       return;
     }
-    queryInProgress.getQueryInfo().setQueryMasterResource(workResource);
+    queryInProgress.getQueryInfo().setQueryMasterResource(workResource.workerResource);
 
     //fire QueryJobStart event
     queryInProgress.getEventHandler().handle(
@@ -205,7 +272,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
 
     @Override
     public void run() {
-      heartbeatTimeout = masterContext.getConf().getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
+      heartbeatTimeout = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_TIMEOUT);
       LOG.info("WorkerMonitor start");
       while(!stopped.get()) {
         try {
@@ -272,6 +339,12 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
+  class AllocatedWorkerResource {
+    WorkerResource workerResource;
+    int allocatedMemoryMB;
+    float allocatedDiskSlots;
+  }
+
   class WorkerResourceAllocationThread extends Thread {
     @Override
     public void run() {
@@ -283,36 +356,56 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
           if (LOG.isDebugEnabled()) {
             LOG.debug("allocateWorkerResources:" +
                 (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
-                ", required:" + resourceRequest.request.getNumWorks() +
+                ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+                "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+                ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+                ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+                "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
                 ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
                 ", liveWorkers=" + liveWorkerResources.size());
           }
 
-          List<WorkerResource> workerResources = chooseWorkers(
-              resourceRequest.request.getMemoryMBSlots(),
-              resourceRequest.request.getDiskSlots(),
-              resourceRequest.request.getNumWorks());
-
-          LOG.debug("allocateWorkerResources: allocated:" + workerResources.size());
+          List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest);
 
-          if(workerResources.size() > 0) {
+          if(allocatedWorkerResources.size() > 0) {
             if(resourceRequest.queryMasterRequest) {
-              startQueryMaster(resourceRequest.queryId, workerResources.get(0));
+              startQueryMaster(resourceRequest.queryId, allocatedWorkerResources.get(0));
             } else {
-              List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts =
+              List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources =
                   new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
 
-              for(WorkerResource eachWorker: workerResources) {
-                workerHosts.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
-                    .setWorkerHost(eachWorker.getAllocatedHost())
-                    .setQueryMasterPort(eachWorker.getQueryMasterPort())
-                    .setPeerRpcPort(eachWorker.getPeerRpcPort())
-                    .setWorkerPullServerPort(eachWorker.getPullServerPort())
+              for(AllocatedWorkerResource eachWorker: allocatedWorkerResources) {
+                NodeIdPBImpl nodeId = new NodeIdPBImpl();
+
+                nodeId.setHost(eachWorker.workerResource.getAllocatedHost());
+                nodeId.setPort(eachWorker.workerResource.getPeerRpcPort());
+
+                TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+                containerId.setApplicationAttemptId(
+                    ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+                containerId.setId(containerIdSeq.incrementAndGet());
+
+                YarnProtos.ContainerIdProto containerIdProto = containerId.getProto();
+                allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+                    .setContainerId(containerIdProto)
+                    .setNodeId(nodeId.toString())
+                    .setWorkerHost(eachWorker.workerResource.getAllocatedHost())
+                    .setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort())
+                    .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort())
+                    .setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort())
+                    .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB)
+                    .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots)
                     .build());
+
+                synchronized(workerResourceLock) {
+                  allocatedResourceMap.put(containerIdProto, eachWorker);
+                }
               }
+
               resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
                   .setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
-                  .addAllWorkerAllocatedResource(workerHosts)
+                  .addAllWorkerAllocatedResource(allocatedResources)
                   .build()
               );
             }
@@ -335,54 +428,179 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
-  private List<WorkerResource> chooseWorkers(int requiredMemoryMBSlots, int requiredDiskSlots,
-                                             int numWorkerSlots) {
-    List<WorkerResource> selectedWorkers = new ArrayList<WorkerResource>();
+  private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
+    List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();
 
-    int selectedCount = 0;
+    int allocatedResources = 0;
+
+    if(resourceRequest.queryMasterRequest) {
+      WorkerResource worker = allocateQueryMaster(resourceRequest.queryId);
+      if(worker != null) {
+        AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+        allocatedWorkerResource.workerResource = worker;
+        allocatedWorkerResource.allocatedDiskSlots = queryMasterDefaultDiskSlot;
+        allocatedWorkerResource.allocatedMemoryMB = queryMasterDefaultMemoryMB;
+        selectedWorkers.add(allocatedWorkerResource);
+
+        return selectedWorkers;
+      }
+    }
+
+    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
+        = resourceRequest.request.getResourceRequestPriority();
+
+    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
+      synchronized(workerResourceLock) {
+        List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
+        int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
+        float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
+            resourceRequest.request.getMinDiskSlotPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
 
-    synchronized(workerResourceLock) {
-      List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
-      Collections.shuffle(randomWorkers);
-      int liveWorkerSize = randomWorkers.size();
-      Set<String> insufficientWorkers = new HashSet<String>();
-      boolean stop = false;
-      while(!stop) {
-        if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
-          break;
-        }
-        for(String eachWorker: randomWorkers) {
-          if(insufficientWorkers.size() >= liveWorkerSize || selectedCount >= numWorkerSlots) {
-            stop = true;
-          } else {
             WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
-            if(workerResource.getAvailableMemoryMBSlots() >= requiredMemoryMBSlots) {
-              workerResource.addUsedMemoryMBSlots(requiredMemoryMBSlots);
-              //workerResource.addUsedDiskSlots(requiredDiskSlots);
-              selectedWorkers.add(workerResource);
-              selectedCount++;
+            if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
+              int workerMemory;
+              if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
+                workerMemory = maxMemoryMB;
+              } else {
+                workerMemory = workerResource.getAvailableMemoryMB();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.workerResource = workerResource;
+              allocatedWorkerResource.allocatedMemoryMB = workerMemory;
+              if(workerResource.getAvailableDiskSlots() >= diskSlot) {
+                allocatedWorkerResource.allocatedDiskSlots = diskSlot;
+              } else {
+                allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
             } else {
               insufficientWorkers.add(eachWorker);
             }
           }
         }
-        if(!stop) {
-          for(String eachWorker: insufficientWorkers) {
-            randomWorkers.remove(eachWorker);
+      }
+    } else {
+      synchronized(workerResourceLock) {
+        List<String> randomWorkers = new ArrayList<String>(liveWorkerResources);
+        Collections.shuffle(randomWorkers);
+
+        int numContainers = resourceRequest.request.getNumContainers();
+        float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
+        float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
+        int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
+            resourceRequest.request.getMinMemoryMBPerContainer());
+
+        int liveWorkerSize = randomWorkers.size();
+        Set<String> insufficientWorkers = new HashSet<String>();
+        boolean stop = false;
+        boolean checkMax = true;
+        while(!stop) {
+          if(allocatedResources >= numContainers) {
+            break;
+          }
+
+          if(insufficientWorkers.size() >= liveWorkerSize) {
+            if(!checkMax) {
+              break;
+            }
+            insufficientWorkers.clear();
+            checkMax = false;
+          }
+          float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;
+
+          for(String eachWorker: randomWorkers) {
+            if(allocatedResources >= numContainers) {
+              stop = true;
+              break;
+            }
+
+            if(insufficientWorkers.size() >= liveWorkerSize) {
+              break;
+            }
+
+            WorkerResource workerResource = allWorkerResourceMap.get(eachWorker);
+            if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
+              float workerDiskSlots;
+              if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
+                workerDiskSlots = maxDiskSlots;
+              } else {
+                workerDiskSlots = workerResource.getAvailableDiskSlots();
+              }
+              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
+              allocatedWorkerResource.workerResource = workerResource;
+              allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;
+
+              if(workerResource.getAvailableMemoryMB() >= memoryMB) {
+                allocatedWorkerResource.allocatedMemoryMB = memoryMB;
+              } else {
+                allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
+              }
+              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
+                  allocatedWorkerResource.allocatedMemoryMB);
+
+              selectedWorkers.add(allocatedWorkerResource);
+
+              allocatedResources++;
+            } else {
+              insufficientWorkers.add(eachWorker);
+            }
           }
         }
       }
     }
-
     return selectedWorkers;
   }
 
   @Override
-  public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
+  public void releaseWorkerResource(ExecutionBlockId ebId, YarnProtos.ContainerIdProto containerId) {
     synchronized(workerResourceLock) {
-      WorkerResource managedWorkerResource = allWorkerResourceMap.get(workerResource.getId());
-      if(managedWorkerResource != null) {
-        managedWorkerResource.releaseResource(workerResource);
+      AllocatedWorkerResource allocatedWorkerResource = allocatedResourceMap.get(containerId);
+      if(allocatedWorkerResource != null) {
+        LOG.info("Release Resource:" + ebId + "," +
+            allocatedWorkerResource.allocatedDiskSlots + "," + allocatedWorkerResource.allocatedMemoryMB);
+        allocatedWorkerResource.workerResource.releaseResource(
+            allocatedWorkerResource.allocatedDiskSlots, allocatedWorkerResource.allocatedMemoryMB);
+      } else {
+        LOG.warn("No AllocatedWorkerResource data for [" + ebId + "," + containerId + "]");
+        return;
       }
     }
 
@@ -411,7 +629,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         return;
       } else {
         queryMasterWorkerResource = queryMasterMap.remove(queryId);
-        queryMasterWorkerResource.releaseQueryMasterTask();
+        queryMasterWorkerResource.releaseQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB);
       }
     }
 
@@ -462,7 +680,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
         workerResource.setLastHeartbeat(System.currentTimeMillis());
         workerResource.setWorkerStatus(WorkerStatus.LIVE);
         if(request.getServerStatus() != null) {
-          workerResource.setMemoryMBSlots(request.getServerStatus().getSystem().getTotalMemoryMB());
+          workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());
           workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors());
           workerResource.setDiskSlots(request.getServerStatus().getDiskSlots());
           workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum());
@@ -470,7 +688,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
           workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap());
           workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap());
         } else {
-          workerResource.setMemoryMBSlots(4096);
+          workerResource.setMemoryMB(4096);
           workerResource.setDiskSlots(4);
           workerResource.setCpuCoreSlots(4);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index b702063..e8c9a9e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -35,12 +35,12 @@ public class WorkerResource implements Comparable<WorkerResource> {
   private int pullServerPort;
   private int httpPort;
 
-  private int diskSlots;
+  private float diskSlots;
   private int cpuCoreSlots;
-  private int memoryMBSlots;
+  private int memoryMB;
 
-  private int usedDiskSlots;
-  private int usedMemoryMBSlots;
+  private float usedDiskSlots;
+  private int usedMemoryMB;
   private int usedCpuCoreSlots;
 
   private long maxHeap;
@@ -75,28 +75,11 @@ public class WorkerResource implements Comparable<WorkerResource> {
     this.allocatedHost = allocatedHost;
   }
 
-  public void addUsedDiskSlots(int diskSlots) {
-    usedDiskSlots += diskSlots;
-  }
-
-  public void addUsedMemoryMBSlots(int memoryMBSlots) {
-    try {
-      wlock.lock();
-      usedMemoryMBSlots += memoryMBSlots;
-    } finally {
-      wlock.unlock();
-    }
-  }
-
-  public void addUsedCpuCoreSlots(int cpuCoreSlots) {
-    usedCpuCoreSlots += cpuCoreSlots;
-  }
-
-  public int getDiskSlots() {
+  public float getDiskSlots() {
     return diskSlots;
   }
 
-  public void setDiskSlots(int diskSlots) {
+  public void setDiskSlots(float diskSlots) {
     this.diskSlots = diskSlots;
   }
 
@@ -108,36 +91,40 @@ public class WorkerResource implements Comparable<WorkerResource> {
     this.cpuCoreSlots = cpuCoreSlots;
   }
 
-  public int getMemoryMBSlots() {
+  public int getMemoryMB() {
     try {
       rlock.lock();
-      return memoryMBSlots;
+      return memoryMB;
     } finally {
       rlock.unlock();
     }
   }
 
-  public void setMemoryMBSlots(int memoryMBSlots) {
+  public void setMemoryMB(int memoryMB) {
     try {
       wlock.lock();
-      this.memoryMBSlots = memoryMBSlots;
+      this.memoryMB = memoryMB;
     } finally {
       wlock.unlock();
     }
   }
 
-  public int getAvailableDiskSlots() {
+  public float getAvailableDiskSlots() {
     return diskSlots - usedDiskSlots;
   }
 
-  public int getAvailableMemoryMBSlots() {
-    return getMemoryMBSlots() - getUsedMemoryMBSlots();
+  public int getAvailableMemoryMB() {
+    return memoryMB - usedMemoryMB;
+  }
+
+  public int getAvailableCpuCoreSlots() {
+    return cpuCoreSlots - usedCpuCoreSlots;
   }
 
   @Override
   public String toString() {
-    return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=" + memoryMBSlots + ":" + cpuCoreSlots + ":" + diskSlots +
-        ", used=" + getUsedMemoryMBSlots() + ":" + usedCpuCoreSlots + ":" + usedDiskSlots;
+    return "host:" + allocatedHost + ", port=" + portsToStr() + ", slots=m:" + memoryMB + ",d:" + diskSlots +
+        ",c:" + cpuCoreSlots + ", used=m:" + usedMemoryMB + ",d:" + usedDiskSlots + ",c:" + usedCpuCoreSlots;
   }
 
   public String portsToStr() {
@@ -148,23 +135,22 @@ public class WorkerResource implements Comparable<WorkerResource> {
     this.lastHeartbeat = heartbeatTime;
   }
 
-  public int getUsedMemoryMBSlots() {
+  public int getUsedMemoryMB() {
     try {
       rlock.lock();
-      return usedMemoryMBSlots;
+      return usedMemoryMB;
     } finally {
       rlock.unlock();
     }
   }
 
-  public void setUsedMemoryMBSlots(int usedMemoryMBSlots) {
+  public void setUsedMemoryMB(int usedMemoryMB) {
     try {
       wlock.lock();
-      this.usedMemoryMBSlots = usedMemoryMBSlots;
+      this.usedMemoryMB = usedMemoryMB;
     } finally {
       wlock.unlock();
     }
-
   }
 
   public int getUsedCpuCoreSlots() {
@@ -175,7 +161,7 @@ public class WorkerResource implements Comparable<WorkerResource> {
     this.usedCpuCoreSlots = usedCpuCoreSlots;
   }
 
-  public int getUsedDiskSlots() {
+  public float getUsedDiskSlots() {
     return usedDiskSlots;
   }
 
@@ -211,33 +197,40 @@ public class WorkerResource implements Comparable<WorkerResource> {
     this.taskRunnerMode = taskRunnerMode;
   }
 
-  public void releaseResource(WorkerResource workerResource) {
+  public void releaseResource(float diskSlots, int memoryMB) {
     try {
       wlock.lock();
-      usedMemoryMBSlots = usedMemoryMBSlots - workerResource.getMemoryMBSlots();
+      usedMemoryMB = usedMemoryMB - memoryMB;
+      usedDiskSlots -= diskSlots;
+      if(usedMemoryMB < 0) {
+        LOG.warn("Used memory can't be a minus: " + usedMemoryMB);
+        usedMemoryMB = 0;
+      }
+      if(usedDiskSlots < 0) {
+        LOG.warn("Used disk slot can't be a minus: " + usedDiskSlots);
+        usedDiskSlots = 0;
+      }
     } finally {
       wlock.unlock();
     }
-
-    if(getUsedMemoryMBSlots() < 0 || usedDiskSlots < 0 || usedCpuCoreSlots < 0) {
-      LOG.warn("Used resources can't be a minus.");
-      LOG.warn(this + " ==> " + workerResource);
-    }
   }
 
-  public int getSlots() {
-    //TODO what is slot? 512MB = 1slot?
-    return getMemoryMBSlots()/512;
-  }
+  public void allocateResource(float diskSlots, int memoryMB) {
+    try {
+      wlock.lock();
+      usedMemoryMB += memoryMB;
+      usedDiskSlots += diskSlots;
 
-  public int getAvaliableSlots() {
-    //TODO what is slot? 512MB = 1slot?
-    return getAvailableMemoryMBSlots()/512;
-  }
+      if(usedMemoryMB > this.memoryMB) {
+        usedMemoryMB = this.memoryMB;
+      }
 
-  public int getUsedSlots() {
-    //TODO what is slot? 512MB = 1slot?
-    return getUsedMemoryMBSlots()/512;
+      if(usedDiskSlots > this.diskSlots) {
+        usedDiskSlots = this.diskSlots;
+      }
+    } finally {
+      wlock.unlock();
+    }
   }
 
   public int getPeerRpcPort() {
@@ -316,16 +309,14 @@ public class WorkerResource implements Comparable<WorkerResource> {
     return numQueryMasterTasks.get();
   }
 
-  public void setNumQueryMasterTasks(int numQueryMasterTasks) {
-    this.numQueryMasterTasks.set(numQueryMasterTasks);
-  }
-
-  public void addNumQueryMasterTask() {
+  public void addNumQueryMasterTask(float diskSlots, int memoryMB) {
     numQueryMasterTasks.getAndIncrement();
+    allocateResource(diskSlots, memoryMB);
   }
 
-  public void releaseQueryMasterTask() {
+  public void releaseQueryMasterTask(float diskSlots, int memoryMB) {
     numQueryMasterTasks.getAndDecrement();
+    releaseResource(diskSlots, memoryMB);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 2e66f98..1ce2c9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -20,6 +20,8 @@ package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.proto.YarnProtos.*;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -56,13 +58,13 @@ public interface WorkerResourceManager {
 
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request);
 
-  public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource);
+  public void releaseWorkerResource(ExecutionBlockId ebId, ContainerIdProto containerId);
 
   public Map<String, WorkerResource> getWorkers();
 
   public void stop();
 
-  public int getNumClusterSlots();
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary();
 
   Collection<String> getQueryMasters();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 802e5ed..8b72cf9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -36,10 +36,13 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.YarnContainerProxy;
@@ -83,33 +86,38 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
     return new ArrayList<String>();
   }
 
-  public int getNumClusterSlots() {
-    return 0;
+  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
+    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
+        .setNumWorkers(0)
+        .setTotalCpuCoreSlots(0)
+        .setTotalDiskSlots(0)
+        .setTotalMemoryMB(0)
+        .setTotalAvailableCpuCoreSlots(0)
+        .setTotalAvailableDiskSlots(0)
+        .setTotalAvailableMemoryMB(0)
+        .build();
   }
 
   @Override
   public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
-    //nothing to do
-    //yarn manages worker membership.
+    throw new UnimplementedException("workerHeartbeat");
   }
 
   @Override
-  public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
-    //nothing to do
+  public void releaseWorkerResource(ExecutionBlockId ebId, YarnProtos.ContainerIdProto containerId) {
+    throw new UnimplementedException("releaseWorkerResource");
   }
 
   @Override
   public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
-    //nothing to do
-    //allocateAndLaunchQueryMaster in startQueryMaster()
-    return null;
+    throw new UnimplementedException("allocateQueryMaster");
   }
 
   @Override
   public void allocateWorkerResources(
       TajoMasterProtocol.WorkerResourceAllocationRequest request,
       RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> rpcCallBack) {
-    //nothing to do
+    throw new UnimplementedException("allocateWorkerResources");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
index 5fe1b74..cf68145 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
@@ -26,6 +26,9 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 
 public class ApplicationIdUtils {
+  public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId, int attemptId) {
+    return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), attemptId);
+  }
 
   public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId) {
     return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), 1);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index f0c70cf..8b9219c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -24,5 +24,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 public interface ResourceAllocator {
   public void allocateTaskWorker();
   public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks);
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks, int memoryMBPerTask);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 275660a..ca6ea41 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -25,7 +25,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -93,8 +95,12 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
   }
 
   @Override
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
-    int clusterSlots = workerContext.getNumClusterSlots();
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks,
+                                           int memoryMBPerTask) {
+    //TODO consider disk slot
+    TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
+    int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB()/memoryMBPerTask;
     return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
   }
 
@@ -226,13 +232,18 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
           new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
 
-      int requiredMemoryMBSlot = 512;  //TODO
-      int requiredDiskSlots = 1;  //TODO
+      //TODO consider task's resource usage pattern
+      int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
+      float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
+
       TajoMasterProtocol.WorkerResourceAllocationRequest request =
           TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
-              .setMemoryMBSlots(requiredMemoryMBSlot)
-              .setDiskSlots(requiredDiskSlots)
-              .setNumWorks(event.getRequiredNum())
+              .setMinMemoryMBPerContainer(requiredMemoryMB)
+              .setMaxMemoryMBPerContainer(requiredMemoryMB)
+              .setNumContainers(event.getRequiredNum())
+              .setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
+              .setMinDiskSlotPerContainer(requiredDiskSlots)
+              .setMaxDiskSlotPerContainer(requiredDiskSlots)
               .setExecutionBlockId(event.getExecutionBlockId().getProto())
               .build();
 
@@ -266,25 +277,26 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           continue;
         }
       }
-      int numAllocatedWorkers = 0;
+      int numAllocatedContainers = 0;
 
       if(response != null) {
-        List<TajoMasterProtocol.WorkerAllocatedResource> workerHosts = response.getWorkerAllocatedResourceList();
+        List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
         List<Container> containers = new ArrayList<Container>();
-        for(TajoMasterProtocol.WorkerAllocatedResource eachWorker: workerHosts) {
+        for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeIdPBImpl nodeId = new NodeIdPBImpl();
 
-          nodeId.setHost(eachWorker.getWorkerHost());
-          nodeId.setPort(eachWorker.getPeerRpcPort());
+          nodeId.setHost(eachAllocatedResource.getWorkerHost());
+          nodeId.setPort(eachAllocatedResource.getPeerRpcPort());
 
           TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
           containerId.setApplicationAttemptId(
-              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId()));
-          containerId.setId(containerIdSeq.incrementAndGet());
+              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+                  eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+          containerId.setId(eachAllocatedResource.getContainerId().getId());
 
           container.setId(containerId);
           container.setNodeId(nodeId);
@@ -292,10 +304,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           WorkerResource workerResource = new WorkerResource();
           workerResource.setAllocatedHost(nodeId.getHost());
           workerResource.setPeerRpcPort(nodeId.getPort());
-          workerResource.setQueryMasterPort(eachWorker.getQueryMasterPort());
-          workerResource.setPullServerPort(eachWorker.getWorkerPullServerPort());
-          workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
-          workerResource.setDiskSlots(requiredDiskSlots);
+          workerResource.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
+          workerResource.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
+          workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
+          workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
 
           container.setWorkerResource(workerResource);
 
@@ -304,32 +316,32 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
         SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
         if (!SubQuery.isRunningState(state)) {
-          List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
-          for(Container eachContainer: containers) {
-            workerResources.add(((TajoWorkerContainer)eachContainer).getWorkerResource());
-          }
           try {
-            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, workerResources);
+            List<ContainerId> containerIds = new ArrayList<ContainerId>();
+            for(Container eachContainer: containers) {
+              containerIds.add(eachContainer.getId());
+            }
+            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
           } catch (Exception e) {
             LOG.error(e.getMessage(), e);
           }
           return;
         }
 
-        if (workerHosts.size() > 0) {
+        if (allocatedResources.size() > 0) {
           if(LOG.isDebugEnabled()) {
             LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
           }
           queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
         }
-        numAllocatedWorkers += workerHosts.size();
+        numAllocatedContainers += allocatedResources.size();
 
       }
-      if(event.getRequiredNum() > numAllocatedWorkers) {
+      if(event.getRequiredNum() > numAllocatedContainers) {
         ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
             event.getType(), event.getExecutionBlockId(), event.getPriority(),
             event.getResource(),
-            event.getRequiredNum() - numAllocatedWorkers,
+            event.getRequiredNum() - numAllocatedContainers,
             event.isLeafQuery(), event.getProgress()
         );
         queryTaskContext.getEventHandler().handle(shortRequestEvent);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 4d46a45..57d99c4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -41,6 +41,9 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
@@ -103,7 +106,7 @@ public class TajoWorker extends CompositeService {
 
   private AtomicInteger numClusterNodes = new AtomicInteger();
 
-  private AtomicInteger numClusterSlots = new AtomicInteger();
+  private TajoMasterProtocol.ClusterResourceSummary clusterResource;
 
   private int httpPort;
 
@@ -350,12 +353,16 @@ public class TajoWorker extends CompositeService {
       return TajoWorker.this.numClusterNodes.get();
     }
 
-    public void setNumClusterSlots(int numClusterSlots) {
-      TajoWorker.this.numClusterSlots.set(numClusterSlots);
+    public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) {
+      synchronized(numClusterNodes) {
+        TajoWorker.this.clusterResource = clusterResource;
+      }
     }
 
-    public int getNumClusterSlots() {
-      return TajoWorker.this.numClusterSlots.get();
+    public TajoMasterProtocol.ClusterResourceSummary getClusterResource() {
+      synchronized(numClusterNodes) {
+        return TajoWorker.this.clusterResource;
+      }
     }
 
     public InetSocketAddress getTajoMasterAddress() {
@@ -391,17 +398,18 @@ public class TajoWorker extends CompositeService {
     TajoMasterProtocol.ServerStatusProto.System systemInfo;
     List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
         new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
-    int workerDisksNum;
-    List<File> mountPaths;
+    float workerDiskSlots;
+    int workerMemoryMB;
+    List<DiskDeviceInfo> diskDeviceInfos;
 
     public WorkerHeartbeatThread() {
-      int workerMemoryMB;
       int workerCpuCoreNum;
 
       boolean dedicatedResource = systemConf.getBoolVar(ConfVars.WORKER_RESOURCE_DEDICATED);
-      
+      int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
       try {
-        mountPaths = getMountPath();
+        diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
       }
@@ -411,24 +419,23 @@ public class TajoWorker extends CompositeService {
         int totalMemory = getTotalMemoryMB();
         workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
         workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
-        if(mountPaths == null) {
-          workerDisksNum = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+
+        if(diskDeviceInfos == null) {
+          workerDiskSlots = ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
         } else {
-          workerDisksNum = mountPaths.size();
+          workerDiskSlots = diskDeviceInfos.size();
         }
       } else {
-        // TODO - it's a hack and it must be fixed
-        //workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
-        workerMemoryMB = 512 * systemConf.getIntVar(ConfVars.WORKER_EXECUTION_MAX_SLOTS);
-        workerDisksNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+        workerMemoryMB = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
         workerCpuCoreNum = systemConf.getIntVar(ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+        workerDiskSlots = systemConf.getFloatVar(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
       }
 
       systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
           .setAvailableProcessors(workerCpuCoreNum)
           .setFreeMemoryMB(0)
           .setMaxMemoryMB(0)
-          .setTotalMemoryMB(workerMemoryMB)
+          .setTotalMemoryMB(getTotalMemoryMB())
           .build();
     }
 
@@ -472,16 +479,8 @@ public class TajoWorker extends CompositeService {
       }
 
       while(true) {
-        if(sendDiskInfoCount == 0 && mountPaths != null) {
-          for(File eachFile: mountPaths) {
-            diskInfos.clear();
-            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
-                .setAbsolutePath(eachFile.getAbsolutePath())
-                .setTotalSpace(eachFile.getTotalSpace())
-                .setFreeSpace(eachFile.getFreeSpace())
-                .setUsableSpace(eachFile.getUsableSpace())
-                .build());
-          }
+        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+          getDiskUsageInfos();
         }
         TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
           TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
@@ -494,7 +493,8 @@ public class TajoWorker extends CompositeService {
             .addAllDisk(diskInfos)
             .setRunningTaskNum(taskRunnerManager == null ? 1 : taskRunnerManager.getNumTasks())   //TODO
             .setSystem(systemInfo)
-            .setDiskSlots(workerDisksNum)
+            .setDiskSlots(workerDiskSlots)
+            .setMemoryResourceMB(workerMemoryMB)
             .setJvmHeap(jvmHeap)
             .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(queryMasterMode))
             .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(taskRunnerMode))
@@ -521,13 +521,11 @@ public class TajoWorker extends CompositeService {
 
           TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
           if(response != null) {
-            if(response.getNumClusterNodes() > 0) {
-              workerContext.setNumClusterNodes(response.getNumClusterNodes());
-            }
-
-            if(response.getNumClusterSlots() > 0) {
-              workerContext.setNumClusterSlots(response.getNumClusterSlots());
+            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+            if(clusterResourceSummary.getNumWorkers() > 0) {
+              workerContext.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
             }
+            workerContext.setClusterResource(clusterResourceSummary);
           } else {
             if(callBack.getController().failed()) {
               throw new ServiceException(callBack.getController().errorText());
@@ -557,6 +555,24 @@ public class TajoWorker extends CompositeService {
 
       LOG.info("Worker Resource Heartbeat Thread stopped.");
     }
+
+    private void getDiskUsageInfos() {
+      diskInfos.clear();
+      for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+        List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+        if(mountInfos != null) {
+          for(DiskMountInfo eachMount: mountInfos) {
+            File eachFile = new File(eachMount.getMountPath());
+            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+                .setAbsolutePath(eachFile.getAbsolutePath())
+                .setTotalSpace(eachFile.getTotalSpace())
+                .setFreeSpace(eachFile.getFreeSpace())
+                .setUsableSpace(eachFile.getUsableSpace())
+                .build());
+          }
+        }
+      }
+    }
   }
 
   private class ShutdownHook implements Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index f931615..70a998b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -39,7 +38,10 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.QueryUnitRequest;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index d694b7c..e74a09d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -64,7 +64,9 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
   }
 
   @Override
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
+  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+                                           int numTasks,
+                                           int memoryMBPerTask) {
     int numClusterNodes = workerContext.getNumClusterNodes();
 
     TajoConf conf =  (TajoConf)workerContext.getQueryMaster().getConfig();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
index f9b15a7..dca200e 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -50,12 +50,13 @@ message ServerStatusProto {
     }
 
     required System system = 1;
-    required int32 diskSlots = 2;
-    repeated Disk disk = 3;
-    required int32 runningTaskNum = 4;
-    required JvmHeap jvmHeap = 5;
-    required BoolProto queryMasterMode = 6;
-    required BoolProto taskRunnerMode = 7;
+    required float diskSlots = 2;
+    required int32 memoryResourceMB = 3;
+    repeated Disk disk = 4;
+    required int32 runningTaskNum = 5;
+    required JvmHeap jvmHeap = 6;
+    required BoolProto queryMasterMode = 7;
+    required BoolProto taskRunnerMode = 8;
 }
 
 message TajoHeartbeat {
@@ -79,16 +80,37 @@ message TajoHeartbeatResponse {
       repeated string params = 2;
   }
   required BoolProto heartbeatResult = 1;
-  required int32 numClusterNodes = 2;
-  required int32 numClusterSlots = 3;
-  optional ResponseCommand responseCommand = 4;
+  required ClusterResourceSummary clusterResourceSummary = 2;
+  optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+  required int32 numWorkers = 1;
+  required int32 totalDiskSlots = 2;
+  required int32 totalCpuCoreSlots = 3;
+  required int32 totalMemoryMB = 4;
+
+  required int32 totalAvailableDiskSlots = 5;
+  required int32 totalAvailableCpuCoreSlots = 6;
+  required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+    MEMORY = 1;
+    DISK = 2;
 }
 
 message WorkerResourceAllocationRequest {
     required ExecutionBlockIdProto executionBlockId = 1;
-    required int32 numWorks = 2;
-    required int32 memoryMBSlots = 3 ;
-    required int32 diskSlots = 4;
+    required ResourceRequestPriority resourceRequestPriority = 2;
+
+    required int32 numContainers = 3;
+
+    required int32 maxMemoryMBPerContainer = 4;
+    required int32 minMemoryMBPerContainer = 5;
+
+    required float maxDiskSlotPerContainer = 6;
+    required float minDiskSlotPerContainer = 7;
 }
 
 message WorkerResourceProto {
@@ -96,19 +118,25 @@ message WorkerResourceProto {
     required int32 peerRpcPort = 2;
     required int32 queryMasterPort = 3;
     required ExecutionBlockIdProto executionBlockId = 4;
-    required int32 memoryMBSlots = 5 ;
+    required int32 memoryMB = 5 ;
     required int32 diskSlots = 6;
 }
 
 message WorkerResourceReleaseRequest {
-    repeated WorkerResourceProto workerResources = 1;
+    required ExecutionBlockIdProto executionBlockId = 1;
+    repeated ContainerIdProto containerIds = 2;
 }
 
 message WorkerAllocatedResource {
-    required string workerHost = 1;
-    required int32 peerRpcPort = 2;
-    required int32 queryMasterPort = 3;
-    required int32 workerPullServerPort = 4;
+    required ContainerIdProto containerId = 1;
+    required string nodeId = 2;
+    required string workerHost = 3;
+    required int32 peerRpcPort = 4;
+    required int32 queryMasterPort = 5;
+    required int32 workerPullServerPort = 6;
+
+    required int32 allocatedMemoryMB = 7;
+    required float allocatedDiskSlots = 8;
 }
 
 message WorkerResourceAllocationResponse {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
index d0bf887..0600145 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/cluster.jsp
@@ -31,10 +31,6 @@
   List<String> wokerKeys = new ArrayList<String>(workers.keySet());
   Collections.sort(wokerKeys);
 
-  int totalSlot = 0;
-  int runningSlot = 0;
-  int idleSlot = 0;
-
   int runningQueryMasterTasks = 0;
 
   Set<WorkerResource> liveWorkers = new TreeSet<WorkerResource>();
@@ -58,9 +54,6 @@
     if(eachWorker.isTaskRunnerMode()) {
       if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
         liveWorkers.add(eachWorker);
-        idleSlot += eachWorker.getAvaliableSlots();
-        totalSlot += eachWorker.getSlots();
-        runningSlot += eachWorker.getUsedSlots();
       } else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
         deadWorkers.add(eachWorker);
       } else if(eachWorker.getWorkerStatus() == WorkerStatus.DECOMMISSION) {
@@ -94,7 +87,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running QueryMaster Tasks</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
+    <tr><th>No</th><th>QueryMaster</th><th>Client Port</th><th>Running Query</th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 
 <%
     int no = 1;
@@ -106,7 +99,7 @@
       <td><a href='<%=queryMasterHttp%>'><%=queryMaster.getAllocatedHost() + ":" + queryMaster.getQueryMasterPort()%></a></td>
       <td width='100' align='center'><%=queryMaster.getClientPort()%></td>
       <td width='200' align='right'><%=queryMaster.getNumQueryMasterTasks()%></td>
-      <td width='200' align='left'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
+      <td width='200' align='center'><%=queryMaster.getFreeHeap()/1024/1024%>/<%=queryMaster.getMaxHeap()/1024/1024%> MB</td>
       <td width='100' align='right'><%=JSPUtil.getElapsedTime(queryMaster.getLastHeartbeat(), System.currentTimeMillis())%></td>
       <td width='100' align='center'><%=queryMaster.getWorkerStatus()%></td>
     </tr>
@@ -148,7 +141,7 @@
 
   <hr/>
   <h2>Worker</h2>
-  <div>Live:<%=liveWorkers.size()%>, Dead: <%=deadWorkersHtml%>, Slot(running/idle/total): <%=runningSlot%> / <%=idleSlot%> / <%=totalSlot%></div>
+  <div>Live:<%=liveWorkers.size()%>, Dead: <%=deadWorkersHtml%></div>
   <hr/>
   <h3>Live Workers</h3>
 <%
@@ -157,7 +150,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Heartbeat</th><th>Status</th></tr>
+    <tr><th>No</th><th>Worker</th><th>PullServer<br/>Port</th><th>Running Tasks</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 <%
     int no = 1;
     for(WorkerResource worker: liveWorkers) {
@@ -166,12 +159,11 @@
     <tr>
       <td width='30' align='right'><%=no++%></td>
       <td><a href='<%=workerHttp%>'><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></a></td>
-      <td width='150' align='center'><%=worker.getPullServerPort()%></td>
+      <td width='80' align='center'><%=worker.getPullServerPort()%></td>
       <td width='100' align='right'><%=worker.getNumRunningTasks()%></td>
-      <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
-      <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
-      <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
-      <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+      <td width='150' align='center'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
+      <td width='100' align='center'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
+      <td width='100' align='center'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
       <td width='100' align='right'><%=JSPUtil.getElapsedTime(worker.getLastHeartbeat(), System.currentTimeMillis())%></td>
       <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
     </tr>
@@ -196,7 +188,7 @@
   } else {
 %>
   <table width="100%" class="border_table" border="1">
-    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Slot</th></th><th>Heap(free/max)</th><th>Disk</th><th>Cpu</th><th>Status</th></tr>
+    <tr><th>No</th><th>Worker</th><th>PullServer Port</th><th>Running Tasks</th><th>Memory Resource</th><th>Disk Resource</th></th><th>Heap(free/max)</th><th>Heartbeat</th><th>Status</th></tr>
 <%
       int no = 1;
       for(WorkerResource worker: deadWorkers) {
@@ -205,10 +197,9 @@
       <td width='30' align='right'><%=no++%></td>
       <td><%=worker.getAllocatedHost() + ":" + worker.getPeerRpcPort()%></td>
       <td width='150' align='center'><%=worker.getPullServerPort()%></td>
-      <td width='100' align='right'><%=worker.getUsedSlots()%>/<%=worker.getSlots()%></td>
-      <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
+      <td width='100' align='right'><%=worker.getUsedMemoryMB()%>/<%=worker.getMemoryMB()%></td>
       <td width='100' align='right'><%=worker.getUsedDiskSlots()%>/<%=worker.getDiskSlots()%></td>
-      <td width='100' align='right'><%=worker.getUsedCpuCoreSlots()%>/<%=worker.getCpuCoreSlots()%></td>
+      <td width='100' align='left'><%=worker.getFreeHeap()/1024/1024%>/<%=worker.getMaxHeap()/1024/1024%> MB</td>
       <td width='100' align='center'><%=worker.getWorkerStatus()%></td>
     </tr>
 <%

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
index fd5fa47..f652ea5 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/index.jsp
@@ -29,6 +29,7 @@
 <%@ page import="org.apache.hadoop.util.StringUtils" %>
 <%@ page import="org.apache.hadoop.fs.FileSystem" %>
 <%@ page import="org.apache.tajo.conf.TajoConf" %>
+<%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -44,9 +45,9 @@
   int numDeadQueryMasters = 0;
   int runningQueryMasterTask = 0;
 
-  int totalSlot = 0;
-  int runningSlot = 0;
-  int idleSlot = 0;
+
+  TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary =
+          master.getContext().getResourceManager().getClusterResourceSummary();
 
   for(WorkerResource eachWorker: workers.values()) {
     if(eachWorker.getWorkerStatus() == WorkerStatus.LIVE) {
@@ -58,9 +59,6 @@
       if(eachWorker.isTaskRunnerMode()) {
         numWorkers++;
         numLiveWorkers++;
-        idleSlot += eachWorker.getAvaliableSlots();
-        totalSlot += eachWorker.getSlots();
-        runningSlot += eachWorker.getUsedSlots();
       }
     } else if(eachWorker.getWorkerStatus() == WorkerStatus.DEAD) {
       if(eachWorker.isQueryMasterMode()) {
@@ -143,24 +141,24 @@
 
   <h3>Cluster Summary</h3>
   <table width="100%" class="border_table" border="1">
-    <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Total Slots</th><th>Running Slots</th><th>Idle Slots</th></tr>
+    <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr>
     <tr>
       <td><a href='cluster.jsp'>Query Master</a></td>
       <td align='right'><%=numQueryMasters%></td>
       <td align='right'><%=numLiveQueryMasters%></td>
       <td align='right'><%=numDeadQueryMastersHtml%></td>
-      <td align='right'>-</td>
       <td align='right'><%=runningQueryMasterTask%></td>
-      <td align='right'>-</td>
+      <td align='center'>-</td>
+      <td align='center'>-</td>
     </tr>
     <tr>
       <td><a href='cluster.jsp'>Worker</a></td>
       <td align='right'><%=numWorkers%></td>
       <td align='right'><%=numLiveWorkers%></td>
       <td align='right'><%=numDeadWorkersHtml%></td>
-      <td align='right'><%=totalSlot%></td>
-      <td align='right'><%=runningSlot%></td>
-      <td align='right'><%=idleSlot%></td>
+      <td align='right'>-</td>
+      <td align='center'><%=clusterResourceSummary.getTotalMemoryMB() - clusterResourceSummary.getTotalAvailableMemoryMB()%>/<%=clusterResourceSummary.getTotalMemoryMB()%></td>
+      <td align='center'><%=clusterResourceSummary.getTotalDiskSlots() - clusterResourceSummary.getTotalAvailableDiskSlots()%>/<%=clusterResourceSummary.getTotalDiskSlots()%></td>
     </tr>
   </table>
   <p/>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/528c914f/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9ea07de..b7c8512 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -96,6 +96,9 @@ public class TajoTestingCluster {
       );
       conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
     }
+    conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);
+    conf.setFloat(ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.varname, 2.0f);
+
     this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
         .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
     conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");