You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/07/02 05:31:25 UTC

git commit: TEZ-221. Deadlock in DAGSchedulerMRR when setting slow start to 0.5f on a small cluster. Enable preemption (bikas)

Updated Branches:
  refs/heads/master 51d5c6f7f -> 44c2e50d5


TEZ-221. Deadlock in DAGSchedulerMRR when setting slow start to 0.5f on a small cluster. Enable preemption (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/44c2e50d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/44c2e50d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/44c2e50d

Branch: refs/heads/master
Commit: 44c2e50d576039d3793c22f04f056b89a2b2bfab
Parents: 51d5c6f
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Jul 1 20:27:41 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Jul 1 20:27:41 2013 -0700

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/DAGSchedulerMRR.java   | 33 ++++++++++++++------
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 26 ++++++++-------
 2 files changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44c2e50d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
index d4a347e..914a562 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
@@ -180,6 +180,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
         realShufflerResource.setMemory(resource.getMemory());
       }
     }
+    schedulePendingShuffles(getNumShufflesToSchedule());
   }
   
   @Override
@@ -189,10 +190,8 @@ public class DAGSchedulerMRR implements DAGScheduler {
     LOG.info("Task succeeded: " + attempt.getID() + " Vertex: Total:" + vertex.getTotalTasks() + 
         " succeeded: " + vertex.getSucceededTasks());
 
-    if (currentPartitioner == vertex) {
-      // resources now available. try to schedule pending shuffles
-      schedulePendingShuffles(getNumShufflesToSchedule());
-    }
+    // resources now available. try to schedule pending shuffles
+    schedulePendingShuffles(getNumShufflesToSchedule());
   }
   
   int getNumShufflesToSchedule() {
@@ -203,6 +202,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
     }
     
     if(unassignedPartitionTasks.isEmpty()) {
+      LOG.info("All partitioners assigned. Scheduling all shufflers.");
       return pendingShuffleTasks.size();
     }
     
@@ -218,17 +218,20 @@ public class DAGSchedulerMRR implements DAGScheduler {
     int shufflerMemAssigned = shufflerTaskMem * numShuffleTasksScheduled;
     
     // get resources needed by partitioner
-    int numPartionersLeft = currentPartitioner.getTotalTasks()
-        - currentPartitioner.getSucceededTasks();
+    int numPartitioners = currentPartitioner.getTotalTasks();
+    int numPartionersSucceeded = currentPartitioner.getSucceededTasks();
+    int numPartionersLeft = numPartitioners - numPartionersSucceeded;
     int partitionerMemNeeded = numPartionersLeft * partitionerTaskMem;
     
     // find leftover resources for shuffler
     int shufflerMemLeft = totalMem - partitionerMemNeeded;
 
-    int defaultShufflerMem = (int)(minReservedShuffleResource * totalMem);
+    int maxShufflerMem = (int) (totalMem *
+        (Math.min(minReservedShuffleResource, 
+                  numPartionersSucceeded/(float)numPartitioners)));
     
-    if(shufflerMemLeft < defaultShufflerMem) {
-      shufflerMemLeft = defaultShufflerMem;
+    if(shufflerMemLeft < maxShufflerMem) {
+      shufflerMemLeft = maxShufflerMem;
     }
     
     shufflerMemLeft -= shufflerMemAssigned;
@@ -237,6 +240,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
              " Headroom: " + freeMem +
              " PartitionerTaskMem: " + partitionerTaskMem +
              " ShufflerTaskMem: " + shufflerTaskMem + 
+             " MaxShuffleMem: " + maxShufflerMem +
              " PartitionerMemNeeded:" + partitionerMemNeeded +
              " ShufflerMemAssigned: " + shufflerMemAssigned + 
              " ShufflerMemLeft: " + shufflerMemLeft +
@@ -251,7 +255,16 @@ public class DAGSchedulerMRR implements DAGScheduler {
       return pendingShuffleTasks.size();
     }
     
-    return shufflerMemLeft / shufflerTaskMem;
+    int shufflersToSchedule = shufflerMemLeft / shufflerTaskMem;
+    shufflerMemAssigned += shufflerTaskMem * shufflersToSchedule;
+    
+    if(totalMem - shufflerMemAssigned < partitionerTaskMem) {
+      // safety check when reduce ramp up limit is aggressively high
+      LOG.info("Not scheduling more shufflers as it starves partitioners");
+      return 0;
+    }
+    
+    return shufflersToSchedule;
   }
   
   void schedulePendingShuffles(int scheduleCount) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/44c2e50d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index 1cb0eea..88c4330 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -313,7 +313,9 @@ public class TaskScheduler extends AbstractService
               
         LOG.info("Assigning container: " + container + 
             " for task: " + task + 
-            " at locality: " + location);
+            " at locality: " + location + 
+            " resource memory: " + container.getResource().getMemory() +
+            " cpu: " + container.getResource().getVirtualCores());
         
       }
     }
@@ -351,16 +353,14 @@ public class TaskScheduler extends AbstractService
     if(isStopped) {
       return 1;
     }
-    LOG.info("TEMP dagschedulermrr: sync with RM finished. Headroom: "
-        + getAvailableResources().getMemory() + " Allocations: "
-        + taskAllocations.size());
     
     if(totalResources.getMemory() == 0) {
       // assume this is the first allocate callback. nothing is allocated.
       // available resource = totalResource
       // TODO this will not handle dynamic changes in resources
       totalResources = Resources.clone(getAvailableResources());
-      LOG.info("App total resource: " + totalResources.getMemory() + 
+      LOG.info("App total resource memory: " + totalResources.getMemory() + 
+               " cpu: " + totalResources.getVirtualCores() +
                " taskAllocations: " + taskAllocations.size());
     }
     
@@ -437,13 +437,11 @@ public class TaskScheduler extends AbstractService
     return null;
   }
   
-  synchronized void preemptIfNeeded(/*Resource totalResources, 
-      Resource allocatedResources,
-      Map<Object, CookieContainerRequest> taskRequests,
-      LinkedHashMap<Object, Container> taskAllocations*/      
-      ) {
+  synchronized void preemptIfNeeded() {
     Resource freeResources = Resources.subtract(totalResources,
         allocatedResources);
+    LOG.info("Allocated resource memory: " + allocatedResources.getMemory() + 
+             " cpu:" + allocatedResources.getVirtualCores());
     assert freeResources.getMemory() >= 0;
     
     CookieContainerRequest highestPriRequest = null;
@@ -456,7 +454,7 @@ public class TaskScheduler extends AbstractService
       }
     }
     if(highestPriRequest != null && 
-       !Resources.fitsIn(highestPriRequest.getCapability(), freeResources)) {
+       !fitsIn(highestPriRequest.getCapability(), freeResources)) {
       // highest priority request will not fit in existing free resources
       // free up some more
       // TODO this is subject to error wrt RM resource normalization
@@ -486,6 +484,12 @@ public class TaskScheduler extends AbstractService
     }
   }
   
+  private boolean fitsIn(Resource toFit, Resource resource) {
+    // YARN-893 prevents using correct library code
+    //return Resources.fitsIn(toFit, resource);
+    return resource.getMemory() >= toFit.getMemory();
+  }
+  
   private CookieContainerRequest getMatchingRequest(
                                       Container container, String location) {
     Priority priority = container.getPriority();