You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/18 23:55:59 UTC

tez git commit: TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 04571cc53 -> a06cd76d8


TEZ-2816. Preemption sometimes does not respect heartbeats between preemptions (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a06cd76d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a06cd76d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a06cd76d

Branch: refs/heads/master
Commit: a06cd76d8ddcee5f7f939cf72e3eeb3cc59033d0
Parents: 04571cc
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 18 14:55:27 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 18 14:55:27 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 +++
 .../dag/app/rm/YarnTaskSchedulerService.java    | 38 +++++++++++---------
 .../tez/dag/app/rm/TestTaskScheduler.java       | 34 ++++++++++++++----
 .../org/apache/tez/analyzer/TestAnalyzer.java   |  4 +--
 4 files changed, 54 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c34cca..b4a5db4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2832. Support tests for both SimpleHistory logging and ATS logging
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
@@ -184,6 +185,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
   TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
@@ -441,6 +443,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
@@ -662,6 +665,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-2812. Preemption sometimes does not respect heartbeats between preemptions
   TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.

http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 3c1cf3f..5a5464f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -892,7 +892,9 @@ public class YarnTaskSchedulerService extends TaskScheduler
 
     synchronized (this) {
       numHeartbeats++;
-      preemptIfNeeded();      
+      if (preemptIfNeeded()) {
+        heartbeatAtLastPreemption = numHeartbeats;
+      }
     }
 
     return getContext().getProgress();
@@ -1127,10 +1129,10 @@ public class YarnTaskSchedulerService extends TaskScheduler
       " heartbeats: " + numHeartbeats + " lastPreemptionHeartbeat: " + heartbeatAtLastPreemption;
   }
   
-  void preemptIfNeeded() {
+  boolean preemptIfNeeded() {
     if (preemptionPercentage == 0) {
       // turned off
-      return;
+      return true;
     }
     ContainerId[] preemptedContainers = null;
     int numPendingRequestsToService = 0;
@@ -1162,7 +1164,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
       
       if (highestPriRequest == null) {
         // nothing pending
-        return;
+        return true;
       }
       
       if(fitsIn(highestPriRequest.getCapability(), freeResources)) {
@@ -1173,7 +1175,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
             LOG.info(highestPriRequest + " fits in free resources");
           }
         }
-        return;
+        return true;
       }
       // highest priority request will not fit in existing free resources
       // free up some more
@@ -1183,7 +1185,8 @@ public class YarnTaskSchedulerService extends TaskScheduler
           preemptionPercentage);
 
       if (numPendingRequestsToService < 1) {
-        return;
+        // nothing to preempt - reset preemption last heartbeat
+        return true;
       }
 
       if (LOG.isDebugEnabled()) {
@@ -1191,7 +1194,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
             + numHighestPriRequests + " pending requests at pri: "
             + highestPriRequest.getPriority());
       }
-      
+
       for (int i=0; i<numPendingRequestsToService; ++i) {
         // This request must have been considered for matching with all existing 
         // containers when request was made.
@@ -1204,7 +1207,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
               LOG.debug("Reused container exists. Wait for assignment loop to release it. "
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           if (heldContainer.geNumAssignmentAttempts() < 3) {
             // we havent tried to assign this container at node/rack/ANY
@@ -1212,7 +1215,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
               LOG.debug("Brand new container. Wait for assignment loop to match it. "
                   + heldContainer.getContainer().getId());
             }
-            return;
+            return true;
           }
           Container container = heldContainer.getContainer();
           if (lowestPriNewContainer == null ||
@@ -1257,18 +1260,14 @@ public class YarnTaskSchedulerService extends TaskScheduler
       }
       
       if (numPendingRequestsToService < 1) {
-        return;
+        return true;
       }
 
       // there are no reused or new containers to release. try to preempt running containers
       // this assert will be a no-op in production but can help identify 
       // invalid assumptions during testing
       assert delayedContainerManager.delayedContainers.isEmpty();
-      
-      if ((numHeartbeats - heartbeatAtLastPreemption) <= numHeartbeatsBetweenPreemptions) {
-        return;
-      }
-        
+              
       Priority preemptedTaskPriority = null;
       int numEntriesAtPreemptedPriority = 0;
       for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
@@ -1304,7 +1303,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
         numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
             numPendingRequestsToService);
         if (numPendingRequestsToService < 1) {
-          return;
+          return true;
+        }
+        // wait for enough heartbeats since this request became active for preemption
+        if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) {
+          // stop incrementing lastpreemption heartbeat count
+          return false;
         }
         LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
             + numHighestPriRequests + " pending requests at pri: "
@@ -1331,7 +1335,6 @@ public class YarnTaskSchedulerService extends TaskScheduler
     
     // upcall outside locks
     if (preemptedContainers != null) {
-      heartbeatAtLastPreemption = numHeartbeats;
       for(int i=0; i<numPendingRequestsToService; ++i) {
         ContainerId cId = preemptedContainers[i];
         if (cId != null) {
@@ -1340,6 +1343,7 @@ public class YarnTaskSchedulerService extends TaskScheduler
         }
       }
     }
+    return true;
   }
 
   private boolean fitsIn(Resource toFit, Resource resource) {

http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 5efea48..1fc5092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -1508,6 +1508,7 @@ public class TestTaskScheduler {
 
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
 
     TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
         null, null, new PreemptionMatcher(), conf);
@@ -1692,7 +1693,10 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
 
+    // add a pending request that cannot be allocated until resources free up
     Object mockTask3WaitCookie = new Object();
     scheduler.allocateTask(mockTask3Wait, taskAsk, null,
                            null, pri6, obj3, mockTask3WaitCookie);
@@ -1711,6 +1715,7 @@ public class TestTaskScheduler {
     containers.clear();
     containers.add(mockContainer4);
     
+    // new lower pri container added that wont be matched and eventually preempted
     // Fudge new container being present in delayed allocation list due to race
     HeldContainer heldContainer = new HeldContainer(mockContainer4, -1, -1, null,
         containerSignatureMatcher);
@@ -1719,6 +1724,9 @@ public class TestTaskScheduler {
     scheduler.getProgress();
     drainableAppCallback.drain();
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
     heldContainer.incrementAssignmentAttempts();
     // no preemption - container assignment attempts < 3
     scheduler.getProgress();
@@ -1740,12 +1748,18 @@ public class TestTaskScheduler {
     
     // remove fudging.
     scheduler.delayedContainerManager.delayedContainers.clear();
-    
+
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+
     scheduler.allocateTask(mockTask3Retry, taskAsk, null,
                            null, pri5, obj3, null);
     // no preemption - higher pri. exact match
     scheduler.getProgress();
+    // no need for task preemption until now - so they should match
     drainableAppCallback.drain();
+    // no need for task preemption until now - so they should match
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
     verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
 
     for (int i=0; i<11; ++i) {
@@ -1758,16 +1772,22 @@ public class TestTaskScheduler {
     // this is also a higher priority container than the pending task priority but was running a 
     // lower priority task. Task priority is relevant for preemption and not container priority as
     // containers can run tasks of different priorities
-    scheduler.getProgress();
+    scheduler.getProgress(); // first heartbeat
+    Assert.assertTrue(scheduler.numHeartbeats > scheduler.heartbeatAtLastPreemption);
+    drainableAppCallback.drain();
+    scheduler.getProgress(); // second heartbeat
+    drainableAppCallback.drain();
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3B);
-    // next 3 heartbeats do nothing, waiting for the RM to act on the last released resources
-    scheduler.getProgress();
-    scheduler.getProgress();
-    scheduler.getProgress();
+    Assert.assertEquals(scheduler.numHeartbeats, scheduler.heartbeatAtLastPreemption);
+    // there are pending preemptions.
+    scheduler.getProgress(); // first heartbeat
+    scheduler.getProgress(); // second heartbeat
     verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId) any());
-    scheduler.getProgress();
+    scheduler.getProgress(); // third heartbeat
     drainableAppCallback.drain();
     // Next oldest mockTaskPri3KillA gets preempted to clear 10% of outstanding running preemptable tasks
     verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId)any());

http://git-wip-us.apache.org/repos/asf/tez/blob/a06cd76d/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
----------------------------------------------------------------------
diff --git a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
index ca9250d..7ea5a39 100644
--- a/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
+++ b/tez-tools/analyzers/job-analyzer/src/test/java/org/apache/tez/analyzer/TestAnalyzer.java
@@ -168,7 +168,7 @@ public class TestAnalyzer {
         remoteStagingDir.toString());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
 
-    tezSession = TezClient.create("TestFaultTolerance", tezConf, true);
+    tezSession = TezClient.create("TestAnalyzer", tezConf, true);
     tezSession.start();
   }
   
@@ -821,4 +821,4 @@ public class TestAnalyzer {
     return Collections.singletonList(check);
   }
   
-}
\ No newline at end of file
+}