You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/03/13 14:44:40 UTC
git commit: TAJO-647: Work unbalance on disk scheduling of
DefaultScheduler. (jinho)
Repository: incubator-tajo
Updated Branches:
refs/heads/master 44b28e6a1 -> 54bf9a7c8
TAJO-647: Work unbalance on disk scheduling of DefaultScheduler. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/54bf9a7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/54bf9a7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/54bf9a7c
Branch: refs/heads/master
Commit: 54bf9a7c8b7851381c94661500ce329f2c12e9ba
Parents: 44b28e6
Author: jinossy <ji...@gmail.com>
Authored: Thu Mar 13 22:42:51 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Thu Mar 13 22:42:51 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/master/DefaultTaskScheduler.java | 493 +++++++++++++++----
.../tajo/master/querymaster/QueryUnit.java | 4 +-
.../tajo/master/querymaster/SubQuery.java | 7 +
4 files changed, 408 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13fa2df..421c5c0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -273,6 +273,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-647: Work unbalance on disk scheduling of DefaultScheduler. (jinho)
+
TAJO-612: Missing INET4 type in SQLParser. (jihoon)
TAJO-672: Wrong progress status when overwrites to partition table.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 3ee93ac..2aefc15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -49,6 +49,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -90,7 +91,6 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
public void run() {
while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
- schedule();
try {
synchronized (schedulingThread){
schedulingThread.wait(100);
@@ -98,6 +98,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
} catch (InterruptedException e) {
break;
}
+ schedule();
}
LOG.info("TaskScheduler schedulingThread stopped");
}
@@ -238,7 +239,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
// if available cluster resource are large then tasks, the scheduler thread are working immediately.
if(remainingScheduledObjectNum() > 0 &&
- (remainingScheduledObjectNum() <= hosts || hosts / 2 < taskRequests.size())){
+ (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
synchronized (schedulingThread){
schedulingThread.notifyAll();
}
@@ -256,7 +257,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
@Override
public void handle(TaskRequestEvent event) {
- LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+ if(LOG.isDebugEnabled()){
+ LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+ }
+
if(stopEventHandling.get()) {
event.getCallback().run(stopTaskRunnerReq);
return;
@@ -284,64 +288,238 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
}
- public static class TaskBlockLocation {
- private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
- new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
- private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
- private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
- private String host;
-
- public TaskBlockLocation(String host){
+ /**
+ * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+ * describes various information for one worker, including :
+ * <ul>
+ * <li>host name</li>
+ * <li>rack name</li>
+ * <li>unassigned tasks for each disk volume</li>
+ * <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+ * <li>the number of running tasks for each volume</li>
+ * </ul>, each task runner and the concurrency number of running tasks for volumes.
+ *
+ * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+ * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+ * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+ *
+ * <h3>Volume id</h3>
+ * Volume id is an integer. Each volume id identifies each disk volume.
+ *
+ * This volume id can be obtained from {@link org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}. *
+ * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+ * In this case, the volume id will be -1 or other native integer.
+ *
+ * <h3>See Also</h3>
+ * <ul>
+ * <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+ * </ul>
+ */
+ public class HostVolumeMapping {
+ private final String host;
+ private final String rack;
+ /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+ private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
+ Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
+ /** A value is last assigned volume id for each task runner */
+ private HashMap<ContainerId, Integer> lastAssignedVolumeId = new HashMap<ContainerId, Integer>();
+ /**
+ * A key is disk volume id, and a value is the load of this volume.
+ * This load is measured by counting how many number of tasks are running.
+ *
+ * These disk volumes are kept in an order of ascending order of the volume id.
+ * In other words, the head volume ids are likely to -1, meaning no given volume id.
+ */
+ private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+ /** The total number of remain tasks in this host */
+ private AtomicInteger remainTasksNum = new AtomicInteger(0);
+ public static final int REMOTE = -2;
+
+
+ public HostVolumeMapping(String host, String rack){
this.host = host;
+ this.rack = rack;
}
- public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
- LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
- if (list == null) {
- list = new LinkedList<QueryUnitAttemptId>();
- unAssignedTaskMap.put(volumeId, list);
+ public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){
+ synchronized (unassignedTaskForEachVolume){
+ LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if (list == null) {
+ list = new LinkedHashSet<QueryUnitAttempt>();
+ unassignedTaskForEachVolume.put(volumeId, list);
+ }
+ list.add(attemptId);
}
- list.add(attemptId);
- if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
+ remainTasksNum.incrementAndGet();
+
+ if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
}
- public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
- Integer volumeId;
+ /**
+ * Priorities
+ * 1. a task list in a volume of host
+ * 2. unknown block or Non-splittable task in host
+ * 3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+ */
+ public synchronized QueryUnitAttemptId getLocalTask(ContainerId containerId) {
+ int volumeId;
+ QueryUnitAttemptId queryUnitAttemptId = null;
+
+ if (!lastAssignedVolumeId.containsKey(containerId)) {
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ } else {
+ volumeId = lastAssignedVolumeId.get(containerId);
+ }
+
+ if (unassignedTaskForEachVolume.size() > 0) {
+ int retry = unassignedTaskForEachVolume.size();
+ do {
+ //clean and get a remaining local task
+ queryUnitAttemptId = getAndRemove(volumeId);
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+ decreaseConcurrency(containerId);
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
- if (!assignedContainerMap.containsKey(containerId)) {
- volumeId = assignVolumeId();
- assignedContainerMap.put(containerId, volumeId);
+ if (queryUnitAttemptId == null) {
+ //reassign next volume
+ volumeId = getLowestVolumeId();
+ increaseConcurrency(containerId, volumeId);
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
} else {
- volumeId = assignedContainerMap.get(containerId);
+ this.remainTasksNum.set(0);
}
+ return queryUnitAttemptId;
+ }
+
+ public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) {
+ QueryUnitAttemptId queryUnitAttemptId = null;
- LinkedList<QueryUnitAttemptId> list = null;
- if (unAssignedTaskMap.size() > 0) {
- int retry = unAssignedTaskMap.size();
+ if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+ int retry = unassignedTaskForEachVolume.size();
do {
- list = unAssignedTaskMap.get(volumeId);
- if (list == null || list.size() == 0) {
- //clean and reassign remaining volume
- unAssignedTaskMap.remove(volumeId);
- volumeUsageMap.remove(volumeId);
- if (volumeId < 0) break; // processed all block on disk
-
- volumeId = assignVolumeId();
- assignedContainerMap.put(containerId, volumeId);
+ //clean and get a remaining task
+ int volumeId = getLowestVolumeId();
+ queryUnitAttemptId = getAndRemove(volumeId);
+ if (queryUnitAttemptId == null) {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
retry--;
} else {
break;
}
} while (retry > 0);
}
- return list;
+ return queryUnitAttemptId;
+ }
+
+ private synchronized QueryUnitAttemptId getAndRemove(int volumeId){
+ QueryUnitAttemptId queryUnitAttemptId = null;
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId;
+
+ LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+ if(list != null && list.size() > 0){
+ QueryUnitAttempt queryUnitAttempt;
+ synchronized (unassignedTaskForEachVolume) {
+ Iterator<QueryUnitAttempt> iterator = list.iterator();
+ queryUnitAttempt = iterator.next();
+ iterator.remove();
+ }
+
+ this.remainTasksNum.getAndDecrement();
+ queryUnitAttemptId = queryUnitAttempt.getId();
+ for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
+ if (!this.getHost().equals(location.getHost())) {
+ HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+ volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+ }
+ }
+ }
+
+ if(list == null || list.isEmpty()) {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ return queryUnitAttemptId;
}
- public Integer assignVolumeId(){
+ private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){
+ if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+ LinkedHashSet<QueryUnitAttempt> tasks = unassignedTaskForEachVolume.get(volumeId);
+
+ if(tasks != null && tasks.size() > 0){
+ tasks.remove(queryUnitAttempt);
+ remainTasksNum.getAndDecrement();
+ } else {
+ unassignedTaskForEachVolume.remove(volumeId);
+ }
+ }
+
+ /**
+ * Increase the count of running tasks and disk loads for a certain task runner.
+ *
+ * @param containerId The task runner identifier
+ * @param volumeId Volume identifier
+ * @return the volume load (i.e., how many running tasks use this volume)
+ */
+ private synchronized int increaseConcurrency(ContainerId containerId, int volumeId) {
+
+ int concurrency = 1;
+ if (diskVolumeLoads.containsKey(volumeId)) {
+ concurrency = diskVolumeLoads.get(volumeId) + 1;
+ }
+
+ if (volumeId > -1) {
+ LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == -1) {
+ // this case is disabled namenode block meta or compressed text file or amazon s3
+ LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+ } else if (volumeId == REMOTE) {
+ // this case has processed all block on host and it will be assigned to remote
+ LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+ + ", Remote Concurrency : " + concurrency);
+ }
+ diskVolumeLoads.put(volumeId, concurrency);
+ lastAssignedVolumeId.put(containerId, volumeId);
+ return concurrency;
+ }
+
+ /**
+ * Decrease the count of running tasks of a certain task runner
+ */
+ private synchronized void decreaseConcurrency(ContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+ Integer concurrency = diskVolumeLoads.get(volumeId);
+ if(concurrency > 0){
+ diskVolumeLoads.put(volumeId, concurrency - 1);
+ } else {
+ if (volumeId > REMOTE) {
+ diskVolumeLoads.remove(volumeId);
+ }
+ }
+ }
+ lastAssignedVolumeId.remove(containerId);
+ }
+
+ /**
+ * volume of a host : 0 ~ n
+ * compressed task, amazon s3, unKnown volume : -1
+ * remote task : -2
+ */
+ public int getLowestVolumeId(){
Map.Entry<Integer, Integer> volumeEntry = null;
- for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
+ for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
if(volumeEntry == null) volumeEntry = entry;
if (volumeEntry.getValue() >= entry.getValue()) {
@@ -350,18 +528,47 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
if(volumeEntry != null){
- volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
- LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
- + volumeUsageMap.get(volumeEntry.getKey()));
return volumeEntry.getKey();
} else {
- return -1; // processed all block on disk
+ return REMOTE;
}
}
+ public boolean isAssigned(ContainerId containerId){
+ return lastAssignedVolumeId.containsKey(containerId);
+ }
+
+ public boolean isRemote(ContainerId containerId){
+ Integer volumeId = lastAssignedVolumeId.get(containerId);
+ if(volumeId == null || volumeId > REMOTE){
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public int getRemoteConcurrency(){
+ return getVolumeConcurrency(REMOTE);
+ }
+
+ public int getVolumeConcurrency(int volumeId){
+ Integer size = diskVolumeLoads.get(volumeId);
+ if(size == null) return 0;
+ else return size;
+ }
+
+ public int getRemainingLocalTaskSize(){
+ return remainTasksNum.get();
+ }
+
public String getHost() {
+
return host;
}
+
+ public String getRack() {
+ return rack;
+ }
}
private class ScheduledRequests {
@@ -370,37 +577,39 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
// if the task is not included in leafTasks and nonLeafTasks.
private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
- private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
+ private Map<String, HostVolumeMapping> leafTaskHostMapping = new HashMap<String, HostVolumeMapping>();
private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
new HashMap<String, LinkedList<QueryUnitAttemptId>>();
private void addLeafTask(QueryUnitAttemptScheduleEvent event) {
QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
- DataLocation[] locations = queryUnitAttempt.getQueryUnit().getDataLocations();
+ List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
for (DataLocation location : locations) {
String host = location.getHost();
- TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
- if (taskBlockLocation == null) {
- taskBlockLocation = new TaskBlockLocation(host);
- leafTaskHostMapping.put(host, taskBlockLocation);
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+ if (hostVolumeMapping == null) {
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ hostVolumeMapping = new HostVolumeMapping(host, rack);
+ leafTaskHostMapping.put(host, hostVolumeMapping);
}
- taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), queryUnitAttempt.getId());
+ hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to host " + host);
}
- String rack = RackResolver.resolve(host).getNetworkLocation();
- LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+ LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
if (list == null) {
list = new LinkedList<QueryUnitAttemptId>();
- leafTasksRackMapping.put(rack, list);
+ leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
}
- list.add(queryUnitAttempt.getId());
+
+ if(!list.contains(queryUnitAttempt.getId())) list.add(queryUnitAttempt.getId());
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Added attempt req to rack " + rack);
+ LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
}
}
@@ -421,77 +630,168 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+ private QueryUnitAttemptId allocateLocalTask(String host, ContainerId containerId){
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+ if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+ while (hostVolumeMapping.getRemainingLocalTaskSize() > 0) {
+ QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+ //find remaining local task
+ if (leafTasks.contains(attemptId)) {
+ leafTasks.remove(attemptId);
+ leafTasksRackMapping.get(hostVolumeMapping.getRack()).remove(attemptId);
+ //LOG.info(attemptId + " Assigned based on host match " + hostName);
+ hostLocalAssigned++;
+ totalAssigned++;
+ return attemptId;
+ }
+ }
+ }
+ return null;
+ }
+
+ private QueryUnitAttemptId allocateRackTask(String host) {
+
+ List<HostVolumeMapping> remainingTasks = new ArrayList<HostVolumeMapping>(leafTaskHostMapping.values());
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ QueryUnitAttemptId attemptId = null;
+
+ if (remainingTasks.size() > 0) {
+ //find largest remaining task of other host in rack
+ Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+ @Override
+ public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+ // descending remaining tasks
+ return Integer.valueOf(v2.remainTasksNum.get()).compareTo(Integer.valueOf(v1.remainTasksNum.get()));
+ }
+ });
+
+ for (HostVolumeMapping tasks : remainingTasks) {
+ while (tasks.getRemainingLocalTaskSize() > 0){
+ QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ leafTasksRackMapping.get(rack).remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ if(attemptId != null) break;
+ }
+ }
+
+ //find task in rack
+ if (attemptId == null) {
+ LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+ while (list != null && list.size() > 0) {
+ QueryUnitAttemptId tId = list.removeFirst();
+
+ if (leafTasks.contains(tId)) {
+ leafTasks.remove(tId);
+ attemptId = tId;
+ break;
+ }
+ }
+ }
+
+ if (attemptId != null) {
+ rackLocalAssigned++;
+ totalAssigned++;
+
+ LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+ }
+ return attemptId;
+ }
+
public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
Collections.shuffle(taskRequests);
+ LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
Iterator<TaskRequestEvent> it = taskRequests.iterator();
TaskRequestEvent taskRequest;
- while (it.hasNext() && leafTasks.size() > 0) {
+ while (leafTasks.size() > 0 && (it.hasNext() || !remoteTaskRequests.isEmpty())) {
taskRequest = it.next();
- LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
- "containerId=" + taskRequest.getContainerId());
+ if(taskRequest == null) { // if there are only remote task requests
+ taskRequest = remoteTaskRequests.removeFirst();
+ }
+
+ // checking if this container is still alive.
+ // If not, ignore the task request and stop the task runner
ContainerProxy container = context.getMasterContext().getResourceAllocator()
.getContainer(taskRequest.getContainerId());
if(container == null) {
+ taskRequest.getCallback().run(stopTaskRunnerReq);
continue;
}
- String host = container.getTaskHostName();
- QueryUnitAttemptId attemptId = null;
- LinkedList<QueryUnitAttemptId> list = null;
+ // getting the hostname of requested node
+ String host = container.getTaskHostName();
- // local disk allocation
+ // if there are no worker matched to the hostname a task request
if(!leafTaskHostMapping.containsKey(host)){
host = NetUtils.normalizeHost(host);
- }
-
- TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
- if (taskBlockLocation != null) {
- list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
- }
- while (list != null && list.size() > 0) {
- QueryUnitAttemptId tId = list.removeFirst();
-
- if (leafTasks.contains(tId)) {
- leafTasks.remove(tId);
- attemptId = tId;
- //LOG.info(attemptId + " Assigned based on host match " + hostName);
- hostLocalAssigned++;
- totalAssigned++;
- break;
+ if(!leafTaskHostMapping.containsKey(host) && it.hasNext()){
+ // this case means one of either cases:
+ // * there are no blocks which reside in this node.
+ // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+ // In this case, we transfer the task request to the remote task request list, and skip the followings.
+ remoteTaskRequests.add(taskRequest);
+ continue;
}
}
- // rack allocation
- if (attemptId == null) {
- String rack = RackResolver.resolve(host).getNetworkLocation();
- list = leafTasksRackMapping.get(rack);
- while(list != null && list.size() > 0) {
+ ContainerId containerId = taskRequest.getContainerId();
+ LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+ "containerId=" + containerId);
- QueryUnitAttemptId tId = list.removeFirst();
+ //////////////////////////////////////////////////////////////////////
+ // disk or host-local allocation
+ //////////////////////////////////////////////////////////////////////
+ QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
- if (leafTasks.contains(tId)) {
- leafTasks.remove(tId);
- attemptId = tId;
-
- rackLocalAssigned++;
- totalAssigned++;
+ if (attemptId == null) { // if a local task cannot be found
+ HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
- LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
- hostLocalAssigned, rackLocalAssigned, totalAssigned,
- ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
- break;
+ if(hostVolumeMapping != null) {
+ if(!hostVolumeMapping.isRemote(containerId)){
+ // assign to remote volume
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+ }
+ // this part is remote concurrency management of a tail tasks
+ int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+ if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+ //release container
+ hostVolumeMapping.decreaseConcurrency(containerId);
+ taskRequest.getCallback().run(stopTaskRunnerReq);
+ subQuery.releaseContainer(containerId);
+ continue;
}
}
- // random allocation
+ //////////////////////////////////////////////////////////////////////
+ // rack-local allocation
+ //////////////////////////////////////////////////////////////////////
+ attemptId = allocateRackTask(host);
+
+ //////////////////////////////////////////////////////////////////////
+ // random node allocation
+ //////////////////////////////////////////////////////////////////////
if (attemptId == null && leafTaskNum() > 0) {
synchronized (leafTasks){
attemptId = leafTasks.iterator().next();
leafTasks.remove(attemptId);
+ rackLocalAssigned++;
+ totalAssigned++;
+ LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+ hostLocalAssigned, rackLocalAssigned, totalAssigned,
+ ((double) hostLocalAssigned / (double) totalAssigned) * 100));
}
- //LOG.info(attemptId + " Assigned based on * match");
}
}
@@ -536,6 +836,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
+ Collections.shuffle(taskRequests);
Iterator<TaskRequestEvent> it = taskRequests.iterator();
TaskRequestEvent taskRequest;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 57b3db4..c4f86fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -252,8 +252,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
- public DataLocation[] getDataLocations() {
- return dataLocations.toArray(new DataLocation[dataLocations.size()]);
+ public List<DataLocation> getDataLocations() {
+ return dataLocations;
}
public String getSucceededHost() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/54bf9a7c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 790d30b..b3aefbe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -548,6 +548,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
}
+ public void releaseContainer(ContainerId containerId) {
+ // try to kill the container.
+ ArrayList<Container> list = new ArrayList<Container>();
+ list.add(containers.get(containerId));
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), list));
+ }
+
/**
* It computes all stats and sets the intermediate result.
*/