You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/11/08 21:07:13 UTC

[3/3] tez git commit: TEZ-1742 addendum patch for follow up comments (cherry picked from commit c6c08c18b77870e71298adb2dc51908473434768)

TEZ-1742 addendum patch for follow up comments
(cherry picked from commit c6c08c18b77870e71298adb2dc51908473434768)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/266b6eba
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/266b6eba
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/266b6eba

Branch: refs/heads/branch-0.5
Commit: 266b6ebaf869ba7dc717e45d411f5891aa2b5924
Parents: 0866922
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Nov 8 11:55:05 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Nov 8 12:04:55 2014 -0800

----------------------------------------------------------------------
 .../dag/app/rm/YarnTaskSchedulerService.java    | 16 +++++++++------
 .../tez/dag/app/rm/TestTaskScheduler.java       | 21 +++++++++++++++++++-
 2 files changed, 30 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/266b6eba/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 5941a45..f8fbd53 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -1051,8 +1051,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     return false; 
   }
 
-  private int scaleDownByPreemptionPercentage(int original) {
-    return (original + (preemptionPercentage - 1)) / preemptionPercentage;
+  static int scaleDownByPreemptionPercentage(int original, int percent) {
+    return (int) Math.ceil((original * percent)/100.f);
   }
   
   void preemptIfNeeded() {
@@ -1103,8 +1103,9 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       // free up some more
       // TODO this is subject to error wrt RM resource normalization
       
-      numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests);
-      
+      numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests,
+          preemptionPercentage);
+
       if (numPendingRequestsToService < 1) {
         return;
       }
@@ -1223,7 +1224,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       }
       if(preemptedTaskPriority != null) {
         int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min(
-            numEntriesAtPreemptedPriority, numHighestPriRequests));
+            numEntriesAtPreemptedPriority, numHighestPriRequests), preemptionPercentage);
         numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
             numPendingRequestsToService);
         if (numPendingRequestsToService < 1) {
@@ -1237,8 +1238,11 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         preemptedContainers = new ContainerId[numPendingRequestsToService];
         int currIndex = 0;
         for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+          Priority taskPriority = lastTaskInfo.getPriority();
           Container container = entry.getValue();
-          if (preemptedTaskPriority.equals(container.getPriority())) {
+          if (preemptedTaskPriority.equals(taskPriority)) {
             // taskAllocations map will iterate from oldest to newest assigned containers
             // keep the N newest containersIds with the matching priority
             preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId();

http://git-wip-us.apache.org/repos/asf/tez/blob/266b6eba/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 3afff7c..5782d01 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1328,7 +1328,7 @@ public class TestTaskScheduler {
     Container mockContainer3B = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer3B.getNodeId().getHost()).thenReturn("host1");
     when(mockContainer3B.getResource()).thenReturn(taskAsk);
-    when(mockContainer3B.getPriority()).thenReturn(pri6);
+    when(mockContainer3B.getPriority()).thenReturn(pri2); // high priority container 
     ContainerId mockCId3B = mock(ContainerId.class);
     when(mockContainer3B.getId()).thenReturn(mockCId3B);
     containers.add(mockContainer3B);
@@ -1391,6 +1391,8 @@ public class TestTaskScheduler {
         scheduler.taskAllocations.get(mockTask3).getId());
     Assert.assertEquals(mockCId3A,
         scheduler.taskAllocations.get(mockTask3KillA).getId());
+    // high priority container assigned to lower pri task. This task should still be preempted 
+    // because the task priority is relevant for preemption and not the container priority
     Assert.assertEquals(mockCId3B,
         scheduler.taskAllocations.get(mockTask3KillB).getId());
 
@@ -1460,6 +1462,9 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
 
     // mockTaskPri3KillB gets preempted to clear 10% of outstanding running preemptable tasks
+    // this is also a higher priority container than the pending task priority but was running a 
+    // lower priority task. Task priority is relevant for preemption and not container priority as
+    // containers can run tasks of different priorities
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
@@ -1633,6 +1638,20 @@ public class TestTaskScheduler {
     when(appClient.getFinalAppStatus()).thenReturn(finalStatus);
     taskScheduler.close();
   }
+  
+  @Test (timeout=5000)
+  public void testScaleDownPercentage() {
+    Assert.assertEquals(100, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 100));
+    Assert.assertEquals(70, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 70));
+    Assert.assertEquals(50, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 50));
+    Assert.assertEquals(10, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 10));
+    Assert.assertEquals(5, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 5));
+    Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(100, 1));
+    Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(5, 5));
+    Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 10));
+    Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 70));
+    Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
+  }
 
   private Container createContainer(int id, String host, Resource resource,
       Priority priority) {