You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/13 14:44:40 UTC

git commit: TAJO-647: Work unbalance on disk scheduling of DefaultScheduler. (jinho)

Repository: incubator-tajo
Updated Branches:
  refs/heads/master 44b28e6a1 -> 54bf9a7c8


TAJO-647: Work unbalance on disk scheduling of DefaultScheduler. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/54bf9a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/54bf9a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/54bf9a7c

Branch: refs/heads/master
Commit: 54bf9a7c8b7851381c94661500ce329f2c12e9ba
Parents: 44b28e6
Author: jinossy <ji...@gmail.com>
Authored: Thu Mar 13 22:42:51 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Thu Mar 13 22:42:51 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/master/DefaultTaskScheduler.java       | 493 +++++++++++++++----
 .../tajo/master/querymaster/QueryUnit.java      |   4 +-
 .../tajo/master/querymaster/SubQuery.java       |   7 +
 4 files changed, 408 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13fa2df..421c5c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -273,6 +273,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-647: Work unbalance on disk scheduling of DefaultScheduler. (jinho)
+
     TAJO-612: Missing INET4 type in SQLParser. (jihoon)
 
     TAJO-672: Wrong progress status when overwrites to partition table.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 3ee93ac..2aefc15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -49,6 +49,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
@@ -90,7 +91,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       public void run() {
 
         while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
-          schedule();
           try {
             synchronized (schedulingThread){
               schedulingThread.wait(100);
@@ -98,6 +98,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           } catch (InterruptedException e) {
             break;
           }
+          schedule();
         }
         LOG.info("TaskScheduler schedulingThread stopped");
       }
@@ -238,7 +239,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     // if available cluster resource are large then tasks, the scheduler thread are working immediately.
     if(remainingScheduledObjectNum() > 0 &&
-        (remainingScheduledObjectNum() <= hosts || hosts / 2 < taskRequests.size())){
+        (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
       synchronized (schedulingThread){
         schedulingThread.notifyAll();
       }
@@ -256,7 +257,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     @Override
     public void handle(TaskRequestEvent event) {
-      LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      if(LOG.isDebugEnabled()){
+        LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      }
+
       if(stopEventHandling.get()) {
         event.getCallback().run(stopTaskRunnerReq);
         return;
@@ -284,64 +288,238 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     }
   }
 
-  public static class TaskBlockLocation {
-    private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
-        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
-    private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
-    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
-    private String host;
-
-    public TaskBlockLocation(String host){
+  /**
+   * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+   * describes various information for one worker, including :
+   * <ul>
+   *  <li>host name</li>
+   *  <li>rack name</li>
+   *  <li>unassigned tasks for each disk volume</li>
+   *  <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+   *  <li>the number of running tasks for each volume</li>
+   * </ul>, each task runner and the concurrency number of running tasks for volumes.
+   *
+   * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+   * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+   * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+   *
+   * <h3>Volume id</h3>
+   * Volume id is an integer. Each volume id identifies each disk volume.
+   *
+   * This volume id can be obtained from {@link org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}.   *
+   * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+   * In this case, the volume id will be -1 or other native integer.
+   *
+   * <h3>See Also</h3>
+   * <ul>
+   *   <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+   * </ul>
+   */
+  public class HostVolumeMapping {
+    private final String host;
+    private final String rack;
+    /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+    private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
+        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
+    /** A value is last assigned volume id for each task runner */
+    private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>();
+    /**
+     * A key is disk volume id, and a value is the load of this volume.
+     * This load is measured by counting how many number of tasks are running.
+     *
+     * These disk volumes are kept in an order of ascending order of the volume id.
+     * In other words, the head volume ids are likely to -1, meaning no given volume id.
+     */
+    private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+    /** The total number of remain tasks in this host */
+    private AtomicInteger remainTasksNum = new AtomicInteger(0);
+    public static final int REMOTE = -2;
+
+
+    public HostVolumeMapping(String host, String rack){
       this.host = host;
+      this.rack = rack;
     }
 
-    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
-      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
-      if (list == null) {
-        list = new LinkedList<QueryUnitAttemptId>();
-        unAssignedTaskMap.put(volumeId, list);
+    public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){
+      synchronized (unassignedTaskForEachVolume){
+        LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+        if (list == null) {
+          list = new LinkedHashSet<QueryUnitAttempt>();
+          unassignedTaskForEachVolume.put(volumeId, list);
+        }
+        list.add(attemptId);
       }
-      list.add(attemptId);
 
-      if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
+      remainTasksNum.incrementAndGet();
+
+      if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
     }
 
-    public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
-      Integer volumeId;
+    /**
+     *  Priorities
+     *  1. a task list in a volume of host
+     *  2. unknown block or Non-splittable task in host
+     *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+     */
+    public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+      int volumeId;
+      QueryUnitAttemptId queryUnitAttemptId = null;
+
+      if (!lastAssignedVolumeId.containsKey(containerId)) {
+        volumeId = getLowestVolumeId();
+        increaseConcurrency(containerId, volumeId);
+      } else {
+        volumeId = lastAssignedVolumeId.get(containerId);
+      }
+
+      if (unassignedTaskForEachVolume.size() >  0) {
+        int retry = unassignedTaskForEachVolume.size();
+        do {
+          //clean and get a remaining local task
+          queryUnitAttemptId = getAndRemove(volumeId);
+          if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+            decreaseConcurrency(containerId);
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
+          }
 
-      if (!assignedContainerMap.containsKey(containerId)) {
-        volumeId = assignVolumeId();
-        assignedContainerMap.put(containerId, volumeId);
+          if (queryUnitAttemptId == null) {
+            //reassign next volume
+            volumeId = getLowestVolumeId();
+            increaseConcurrency(containerId, volumeId);
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
       } else {
-        volumeId = assignedContainerMap.get(containerId);
+        this.remainTasksNum.set(0);
       }
+      return queryUnitAttemptId;
+    }
+
+    public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) {
+      QueryUnitAttemptId queryUnitAttemptId = null;
 
-      LinkedList<QueryUnitAttemptId> list = null;
-      if (unAssignedTaskMap.size() >  0) {
-        int retry = unAssignedTaskMap.size();
+      if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+        int retry = unassignedTaskForEachVolume.size();
         do {
-          list = unAssignedTaskMap.get(volumeId);
-          if (list == null || list.size() == 0) {
-            //clean and reassign remaining volume
-            unAssignedTaskMap.remove(volumeId);
-            volumeUsageMap.remove(volumeId);
-            if (volumeId < 0) break; //  processed all block on disk
-
-            volumeId = assignVolumeId();
-            assignedContainerMap.put(containerId, volumeId);
+          //clean and get a remaining task
+          int volumeId = getLowestVolumeId();
+          queryUnitAttemptId = getAndRemove(volumeId);
+          if (queryUnitAttemptId == null) {
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
             retry--;
           } else {
             break;
           }
         } while (retry > 0);
       }
-      return list;
+      return queryUnitAttemptId;
+    }
+
+    private synchronized QueryUnitAttemptId getAndRemove(int volumeId){
+      QueryUnitAttemptId queryUnitAttemptId = null;
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId;
+
+      LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+      if(list != null && list.size() > 0){
+        QueryUnitAttempt queryUnitAttempt;
+        synchronized (unassignedTaskForEachVolume) {
+          Iterator<QueryUnitAttempt> iterator = list.iterator();
+          queryUnitAttempt = iterator.next();
+          iterator.remove();
+        }
+
+        this.remainTasksNum.getAndDecrement();
+        queryUnitAttemptId = queryUnitAttempt.getId();
+        for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
+          if (!this.getHost().equals(location.getHost())) {
+            HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+            volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+          }
+        }
+      }
+
+      if(list == null || list.isEmpty()) {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+      return queryUnitAttemptId;
     }
 
-    public Integer assignVolumeId(){
+    private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+      LinkedHashSet<QueryUnitAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
+
+      if(tasks != null && tasks.size() > 0){
+        tasks.remove(queryUnitAttempt);
+        remainTasksNum.getAndDecrement();
+      } else {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+    }
+
+    /**
+     * Increase the count of running tasks and disk loads for a certain task runner.
+     *
+     * @param containerId The task runner identifier
+     * @param volumeId Volume identifier
+     * @return the volume load (i.e., how many running tasks use this volume)
+     */
+    private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+
+      int concurrency = 1;
+      if (diskVolumeLoads.containsKey(volumeId)) {
+        concurrency = diskVolumeLoads.get(volumeId) + 1;
+      }
+
+      if (volumeId > -1) {
+        LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == -1) {
+        // this case is disabled namenode block meta or compressed text file or amazon s3
+        LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == REMOTE) {
+        // this case has processed all block on host and it will be assigned to remote
+        LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+            + ", Remote Concurrency : " + concurrency);
+      }
+      diskVolumeLoads.put(volumeId, concurrency);
+      lastAssignedVolumeId.put(containerId, volumeId);
+      return concurrency;
+    }
+
+    /**
+     * Decrease the count of running tasks of a certain task runner
+     */
+    private synchronized void decreaseConcurrency(ContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+        Integer concurrency = diskVolumeLoads.get(volumeId);
+        if(concurrency > 0){
+          diskVolumeLoads.put(volumeId, concurrency - 1);
+        } else {
+          if (volumeId > REMOTE) {
+            diskVolumeLoads.remove(volumeId);
+          }
+        }
+      }
+      lastAssignedVolumeId.remove(containerId);
+    }
+
+    /**
+    *  volume of a host : 0 ~ n
+    *  compressed task, amazon s3, unKnown volume : -1
+    *  remote task : -2
+    */
+    public int getLowestVolumeId(){
       Map.Entry<Integer, Integer> volumeEntry = null;
 
-      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
+      for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
         if(volumeEntry == null) volumeEntry = entry;
 
         if (volumeEntry.getValue() >= entry.getValue()) {
@@ -350,18 +528,47 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
 
       if(volumeEntry != null){
-        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
-        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
-            + volumeUsageMap.get(volumeEntry.getKey()));
         return volumeEntry.getKey();
       } else {
-         return -1;  // processed all block on disk
+        return REMOTE;
       }
     }
 
+    public boolean isAssigned(ContainerId containerId){
+      return lastAssignedVolumeId.containsKey(containerId);
+    }
+
+    public boolean isRemote(ContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId == null || volumeId > REMOTE){
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    public int getRemoteConcurrency(){
+      return getVolumeConcurrency(REMOTE);
+    }
+
+    public int getVolumeConcurrency(int volumeId){
+      Integer size = diskVolumeLoads.get(volumeId);
+      if(size == null) return 0;
+      else return size;
+    }
+
+    public int getRemainingLocalTaskSize(){
+       return remainTasksNum.get();
+    }
+
     public String getHost() {
+
       return host;
     }
+
+    public String getRack() {
+      return rack;
+    }
   }
 
   private class ScheduledRequests {
@@ -370,37 +577,39 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     // if the task is not included in leafTasks and nonLeafTasks.
     private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
-    private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
+    private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
     private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
         new HashMap<String, LinkedList<QueryUnitAttemptId>>();
 
     private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
       QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
-      DataLocation[] locations = queryUnitAttempt.getQueryUnit().getDataLocations();
+      List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
 
       for (DataLocation location : locations) {
         String host = location.getHost();
 
-        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
-        if (taskBlockLocation == null) {
-          taskBlockLocation = new TaskBlockLocation(host);
-          leafTaskHostMapping.put(host, taskBlockLocation);
+        HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+        if (hostVolumeMapping == null) {
+          String rack = RackResolver.resolve(host).getNetworkLocation();
+          hostVolumeMapping = new HostVolumeMapping(host, rack);
+          leafTaskHostMapping.put(host, hostVolumeMapping);
         }
-        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), queryUnitAttempt.getId());
+        hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to host " + host);
         }
 
-        String rack = RackResolver.resolve(host).getNetworkLocation();
-        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
         if (list == null) {
           list = new LinkedList<QueryUnitAttemptId>();
-          leafTasksRackMapping.put(rack, list);
+          leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
         }
-        list.add(queryUnitAttempt.getId());
+
+        if(!list.contains(queryUnitAttempt.getId())) list.add(queryUnitAttempt.getId());
+
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to rack " + rack);
+          LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
         }
       }
 
@@ -421,77 +630,168 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
     public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
 
+    private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+      HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+      if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+        while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+          QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+          //find remaining local task
+          if (leafTasks.contains(attemptId)) {
+            leafTasks.remove(attemptId);
+            leafTasksRackMapping.get(hostVolumeMapping.getRack()).remove(attemptId);
+            //LOG.info(attemptId + " Assigned based on host match " + hostName);
+            hostLocalAssigned++;
+            totalAssigned++;
+            return attemptId;
+          }
+        }
+      }
+      return null;
+    }
+
+    private QueryUnitAttemptId allocateRackTask(String host) {
+
+      List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+      String rack = RackResolver.resolve(host).getNetworkLocation();
+      QueryUnitAttemptId attemptId = null;
+
+      if (remainingTasks.size() > 0) {
+        //find largest remaining task of other host in rack
+        Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+          @Override
+          public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+            // descending remaining tasks
+            return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
+          }
+        });
+
+        for (HostVolumeMapping tasks : remainingTasks) {
+          while (tasks.getRemainingLocalTaskSize() > 0){
+            QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
+            if (leafTasks.contains(tId)) {
+              leafTasks.remove(tId);
+              leafTasksRackMapping.get(rack).remove(tId);
+              attemptId = tId;
+              break;
+            }
+          }
+          if(attemptId != null) break;
+        }
+      }
+
+      //find task in rack
+      if (attemptId == null) {
+        LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        while (list != null && list.size() > 0) {
+          QueryUnitAttemptId tId = list.removeFirst();
+
+          if (leafTasks.contains(tId)) {
+            leafTasks.remove(tId);
+            attemptId = tId;
+            break;
+          }
+        }
+      }
+
+      if (attemptId != null) {
+        rackLocalAssigned++;
+        totalAssigned++;
+
+        LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+            hostLocalAssigned, rackLocalAssigned, totalAssigned,
+            ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+      }
+      return attemptId;
+    }
+
     public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
       Collections.shuffle(taskRequests);
+      LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
 
       TaskRequestEvent taskRequest;
-      while (it.hasNext() && leafTasks.size() > 0) {
+      while (leafTasks.size() > 0 && (it.hasNext() || !remoteTaskRequests.isEmpty())) {
         taskRequest = it.next();
-        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
-            "containerId=" + taskRequest.getContainerId());
+        if(taskRequest == null) { // if there are only remote task requests
+          taskRequest = remoteTaskRequests.removeFirst();
+        }
+
+        // checking if this container is still alive.
+        // If not, ignore the task request and stop the task runner
         ContainerProxy container = context.getMasterContext().getResourceAllocator()
             .getContainer(taskRequest.getContainerId());
         if(container == null) {
+          taskRequest.getCallback().run(stopTaskRunnerReq);
           continue;
         }
-        String host = container.getTaskHostName();
 
-        QueryUnitAttemptId attemptId = null;
-        LinkedList<QueryUnitAttemptId> list = null;
+        // getting the hostname of requested node
+        String host = container.getTaskHostName();
 
-        // local disk allocation
+        // if there are no worker matched to the hostname a task request
         if(!leafTaskHostMapping.containsKey(host)){
           host = NetUtils.normalizeHost(host);
-        }
-
-        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
-        if (taskBlockLocation != null) {
-          list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
-        }
 
-        while (list != null && list.size() > 0) {
-          QueryUnitAttemptId tId = list.removeFirst();
-
-          if (leafTasks.contains(tId)) {
-            leafTasks.remove(tId);
-            attemptId = tId;
-            //LOG.info(attemptId + " Assigned based on host match " + hostName);
-            hostLocalAssigned++;
-            totalAssigned++;
-            break;
+          if(!leafTaskHostMapping.containsKey(host) && it.hasNext()){
+            // this case means one of either cases:
+            // * there are no blocks which reside in this node.
+            // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+            // In this case, we transfer the task request to the remote task request list, and skip the followings.
+            remoteTaskRequests.add(taskRequest);
+            continue;
           }
         }
 
-        // rack allocation
-        if (attemptId == null) {
-          String rack = RackResolver.resolve(host).getNetworkLocation();
-          list = leafTasksRackMapping.get(rack);
-          while(list != null && list.size() > 0) {
+        ContainerId containerId = taskRequest.getContainerId();
+        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+            "containerId=" + containerId);
 
-            QueryUnitAttemptId tId = list.removeFirst();
+        //////////////////////////////////////////////////////////////////////
+        // disk or host-local allocation
+        //////////////////////////////////////////////////////////////////////
+        QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
 
-            if (leafTasks.contains(tId)) {
-              leafTasks.remove(tId);
-              attemptId = tId;
-
-              rackLocalAssigned++;
-              totalAssigned++;
+        if (attemptId == null) { // if a local task cannot be found
+          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
-              LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
-                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
-                  ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
-              break;
+          if(hostVolumeMapping != null) {
+            if(!hostVolumeMapping.isRemote(containerId)){
+              // assign to remote volume
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+            }
+            // this part is remote concurrency management of a tail tasks
+            int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+              //release container
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              taskRequest.getCallback().run(stopTaskRunnerReq);
+              subQuery.releaseContainer(containerId);
+              continue;
             }
           }
 
-          // random allocation
+          //////////////////////////////////////////////////////////////////////
+          // rack-local allocation
+          //////////////////////////////////////////////////////////////////////
+          attemptId = allocateRackTask(host);
+
+          //////////////////////////////////////////////////////////////////////
+          // random node allocation
+          //////////////////////////////////////////////////////////////////////
           if (attemptId == null && leafTaskNum() > 0) {
             synchronized (leafTasks){
               attemptId = leafTasks.iterator().next();
               leafTasks.remove(attemptId);
+              rackLocalAssigned++;
+              totalAssigned++;
+              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
+                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
             }
-            //LOG.info(attemptId + " Assigned based on * match");
           }
         }
 
@@ -536,6 +836,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     }
 
     public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
 
       TaskRequestEvent taskRequest;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 57b3db4..c4f86fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -252,8 +252,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     }
   }
 
-  public DataLocation[] getDataLocations() {
-    return dataLocations.toArray(new DataLocation[dataLocations.size()]);
+  public List<DataLocation> getDataLocations() {
+    return dataLocations;
   }
 
   public String getSucceededHost() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 790d30b..b3aefbe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -548,6 +548,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
   }
 
+  public void releaseContainer(ContainerId containerId) {
+    // try to kill the container.
+    ArrayList<Container> list = new ArrayList<Container>();
+    list.add(containers.get(containerId));
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
+  }
+
   /**
    * It computes all stats and sets the intermediate result.
    */