You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/07/02 05:31:25 UTC
git commit: TEZ-221. Deadlock in DAGSchedulerMRR when setting slow
start to 0.5f on a small cluster. Enable preemption (bikas)
Updated Branches:
refs/heads/master 51d5c6f7f -> 44c2e50d5
TEZ-221. Deadlock in DAGSchedulerMRR when setting slow start to 0.5f on a small cluster. Enable preemption (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/44c2e50d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/44c2e50d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/44c2e50d
Branch: refs/heads/master
Commit: 44c2e50d576039d3793c22f04f056b89a2b2bfab
Parents: 51d5c6f
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jul 1 20:27:41 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jul 1 20:27:41 2013 -0700
----------------------------------------------------------------------
.../tez/dag/app/dag/impl/DAGSchedulerMRR.java | 33 ++++++++++++++------
.../apache/tez/dag/app/rm/TaskScheduler.java | 26 ++++++++-------
2 files changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44c2e50d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
index d4a347e..914a562 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
@@ -180,6 +180,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
realShufflerResource.setMemory(resource.getMemory());
}
}
+ schedulePendingShuffles(getNumShufflesToSchedule());
}
@Override
@@ -189,10 +190,8 @@ public class DAGSchedulerMRR implements DAGScheduler {
LOG.info("Task succeeded: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() +
" succeeded: " + vertex.getSucceededTasks());
- if (currentPartitioner == vertex) {
- // resources now available. try to schedule pending shuffles
- schedulePendingShuffles(getNumShufflesToSchedule());
- }
+ // resources now available. try to schedule pending shuffles
+ schedulePendingShuffles(getNumShufflesToSchedule());
}
int getNumShufflesToSchedule() {
@@ -203,6 +202,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
}
if(unassignedPartitionTasks.isEmpty()) {
+ LOG.info("All partitioners assigned. Scheduling all shufflers.");
return pendingShuffleTasks.size();
}
@@ -218,17 +218,20 @@ public class DAGSchedulerMRR implements DAGScheduler {
int shufflerMemAssigned = shufflerTaskMem * numShuffleTasksScheduled;
// get resources needed by partitioner
- int numPartionersLeft = currentPartitioner.getTotalTasks()
- - currentPartitioner.getSucceededTasks();
+ int numPartitioners = currentPartitioner.getTotalTasks();
+ int numPartionersSucceeded = currentPartitioner.getSucceededTasks();
+ int numPartionersLeft = numPartitioners - numPartionersSucceeded;
int partitionerMemNeeded = numPartionersLeft * partitionerTaskMem;
// find leftover resources for shuffler
int shufflerMemLeft = totalMem - partitionerMemNeeded;
- int defaultShufflerMem = (int)(minReservedShuffleResource * totalMem);
+ int maxShufflerMem = (int) (totalMem *
+ (Math.min(minReservedShuffleResource,
+ numPartionersSucceeded/(float)numPartitioners)));
- if(shufflerMemLeft < defaultShufflerMem) {
- shufflerMemLeft = defaultShufflerMem;
+ if(shufflerMemLeft < maxShufflerMem) {
+ shufflerMemLeft = maxShufflerMem;
}
shufflerMemLeft -= shufflerMemAssigned;
@@ -237,6 +240,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
" Headroom: " + freeMem +
" PartitionerTaskMem: " + partitionerTaskMem +
" ShufflerTaskMem: " + shufflerTaskMem +
+ " MaxShuffleMem: " + maxShufflerMem +
" PartitionerMemNeeded:" + partitionerMemNeeded +
" ShufflerMemAssigned: " + shufflerMemAssigned +
" ShufflerMemLeft: " + shufflerMemLeft +
@@ -251,7 +255,16 @@ public class DAGSchedulerMRR implements DAGScheduler {
return pendingShuffleTasks.size();
}
- return shufflerMemLeft / shufflerTaskMem;
+ int shufflersToSchedule = shufflerMemLeft / shufflerTaskMem;
+ shufflerMemAssigned += shufflerTaskMem * shufflersToSchedule;
+
+ if(totalMem - shufflerMemAssigned < partitionerTaskMem) {
+ // safety check when reduce ramp up limit is aggressively high
+ LOG.info("Not scheduling more shufflers as it starves partitioners");
+ return 0;
+ }
+
+ return shufflersToSchedule;
}
void schedulePendingShuffles(int scheduleCount) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44c2e50d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 1cb0eea..88c4330 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -313,7 +313,9 @@ public class TaskScheduler extends AbstractService
LOG.info("Assigning container: " + container +
" for task: " + task +
- " at locality: " + location);
+ " at locality: " + location +
+ " resource memory: " + container.getResource().getMemory() +
+ " cpu: " + container.getResource().getVirtualCores());
}
}
@@ -351,16 +353,14 @@ public class TaskScheduler extends AbstractService
if(isStopped) {
return 1;
}
- LOG.info("TEMP dagschedulermrr: sync with RM finished. Headroom: "
- + getAvailableResources().getMemory() + " Allocations: "
- + taskAllocations.size());
if(totalResources.getMemory() == 0) {
// assume this is the first allocate callback. nothing is allocated.
// available resource = totalResource
// TODO this will not handle dynamic changes in resources
totalResources = Resources.clone(getAvailableResources());
- LOG.info("App total resource: " + totalResources.getMemory() +
+ LOG.info("App total resource memory: " + totalResources.getMemory() +
+ " cpu: " + totalResources.getVirtualCores() +
" taskAllocations: " + taskAllocations.size());
}
@@ -437,13 +437,11 @@ public class TaskScheduler extends AbstractService
return null;
}
- synchronized void preemptIfNeeded(/*Resource totalResources,
- Resource allocatedResources,
- Map<Object, CookieContainerRequest> taskRequests,
- LinkedHashMap<Object, Container> taskAllocations*/
- ) {
+ synchronized void preemptIfNeeded() {
Resource freeResources = Resources.subtract(totalResources,
allocatedResources);
+ LOG.info("Allocated resource memory: " + allocatedResources.getMemory() +
+ " cpu:" + allocatedResources.getVirtualCores());
assert freeResources.getMemory() >= 0;
CookieContainerRequest highestPriRequest = null;
@@ -456,7 +454,7 @@ public class TaskScheduler extends AbstractService
}
}
if(highestPriRequest != null &&
- !Resources.fitsIn(highestPriRequest.getCapability(), freeResources)) {
+ !fitsIn(highestPriRequest.getCapability(), freeResources)) {
// highest priority request will not fit in existing free resources
// free up some more
// TODO this is subject to error wrt RM resource normalization
@@ -486,6 +484,12 @@ public class TaskScheduler extends AbstractService
}
}
+ private boolean fitsIn(Resource toFit, Resource resource) {
+ // YARN-893 prevents using correct library code
+ //return Resources.fitsIn(toFit, resource);
+ return resource.getMemory() >= toFit.getMemory();
+ }
+
private CookieContainerRequest getMatchingRequest(
Container container, String location) {
Priority priority = container.getPriority();