You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/11/22 02:04:04 UTC

git commit: TEZ-622. Failures in DelayedContainerManager can cause the AM to hang. (hitesh)

Updated Branches:
  refs/heads/master 7b2a0e6f3 -> 8a8ba651b


TEZ-622. Failures in DelayedContainerManager can cause the AM to hang. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8a8ba651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8a8ba651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8a8ba651

Branch: refs/heads/master
Commit: 8a8ba651b9cb0d8f3b0f5039fc7a614fabbf0a2a
Parents: 7b2a0e6
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Nov 21 17:03:45 2013 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Nov 21 17:03:45 2013 -0800

----------------------------------------------------------------------
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 71 ++++++++++++--------
 .../tez/mapreduce/examples/MRRSleepJob.java     |  2 +-
 2 files changed, 44 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a8ba651/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index d8d2de3..67966b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -854,12 +854,17 @@ public class TaskScheduler extends AbstractService
         } else {
           // Don't attempt to delay containers if delay is 0.
           HeldContainer heldContainer = heldContainers.get(container.getId());
-          heldContainer.resetLocalityMatchLevel();
-          long currentTime = System.currentTimeMillis();
-          if (sessionDelay > 0) {
-            heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+          if (heldContainer != null) {
+            heldContainer.resetLocalityMatchLevel();
+            long currentTime = System.currentTimeMillis();
+            if (sessionDelay > 0) {
+              heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+            }
+            assignedContainers = assignDelayedContainer(heldContainer);
+          } else {
+            LOG.info("Skipping container after task deallocate as container is"
+                + " no longer running, containerId=" + container.getId());
           }
-          assignedContainers = assignDelayedContainer(heldContainer);
         }
       }
     }
@@ -1033,9 +1038,12 @@ public class TaskScheduler extends AbstractService
     }
     HeldContainer delayedContainer = heldContainers.remove(containerId);
     if (delayedContainer != null) {
-      Resources.subtractFrom(allocatedResources, delayedContainer.getContainer().getResource());
+      Resources.subtractFrom(allocatedResources,
+          delayedContainer.getContainer().getResource());
+    }
+    if (delayedContainer != null || !shouldReuseContainers) {
+      amRmClient.releaseAssignedContainer(containerId);
     }
-    amRmClient.releaseAssignedContainer(containerId);
     if (assignedTask != null) {
       // A task was assigned at some point. Add to release list since we are
       // releasing the container.
@@ -1054,7 +1062,7 @@ public class TaskScheduler extends AbstractService
     assert result == null;
     containerAssignments.put(container.getId(), task);
     HeldContainer heldContainer = heldContainers.get(container.getId()); 
-    if (heldContainer == null) {
+    if (!shouldReuseContainers && heldContainer == null) {
       heldContainers.put(container.getId(), new HeldContainer(container,
         -1, -1, assigned));
       Resources.addTo(allocatedResources, container.getResource());
@@ -1087,8 +1095,8 @@ public class TaskScheduler extends AbstractService
               + " is already held.");
         }
         Resources.addTo(allocatedResources, container.getResource());
-          delayedContainerManager.addDelayedContainer(container,
-              delayedContainerManager.maxScheduleTimeSeen + 1);
+        delayedContainerManager.addDelayedContainer(container,
+            delayedContainerManager.maxScheduleTimeSeen + 1);
       }
     }
     delayedContainerManager.triggerScheduling(false);      
@@ -1213,9 +1221,9 @@ public class TaskScheduler extends AbstractService
 
   private void releaseUnassignedContainers(Iterable<Container> containers) {
     for (Container container : containers) {
-      releaseContainer(container.getId());
       LOG.info("Releasing unused container: "
-          + container);
+          + container.getId());
+      releaseContainer(container.getId());
     }
   }
 
@@ -1427,36 +1435,45 @@ public class TaskScheduler extends AbstractService
             LOG.info("AllocatedContainerManager Thread interrupted");
           }
         } else {
-          HeldContainer heldContainer = delayedContainers.peek();
-          if (heldContainer == null) {
+          HeldContainer delayedContainer = delayedContainers.peek();
+          if (delayedContainer == null) {
             continue;
           }
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Considering HeldContainer: " + heldContainer + " for assignment");
+            LOG.debug("Considering HeldContainer: "
+              + delayedContainer + " for assignment");
           }
           long currentTs = System.currentTimeMillis();
-          long nextScheduleTs = heldContainer.getNextScheduleTime();
+          long nextScheduleTs = delayedContainer.getNextScheduleTime();
           if (currentTs >= nextScheduleTs) {
             // Remove the container and try scheduling it.
             // TEZ-587 what if container is released by RM after this
             // in onContainerCompleted()
-            heldContainer = delayedContainers.poll();
-            if (heldContainer == null) {
+            delayedContainer = delayedContainers.poll();
+            if (delayedContainer == null) {
               continue;
             }
-            Map<CookieContainerRequest, Container> assignedContainers;
+            Map<CookieContainerRequest, Container> assignedContainers = null;
             synchronized(TaskScheduler.this) {
-              assignedContainers = assignDelayedContainer(heldContainer);
+              if (null !=
+                  heldContainers.get(delayedContainer.getContainer().getId())) {
+                assignedContainers = assignDelayedContainer(delayedContainer);
+              } else {
+                LOG.info("Skipping delayed container as container is no longer"
+                  + " running, containerId="
+                  + delayedContainer.getContainer().getId());
+              }
             }
+            // Inform App should be done outside of the lock
             informAppAboutAssignments(assignedContainers);
           } else {
             synchronized(this) {
               try {
                 // Wait for the next container to be assignable
-                heldContainer = delayedContainers.peek();
+                delayedContainer = delayedContainers.peek();
                 long diff = localitySchedulingDelay;
-                if (heldContainer != null) {
-                  diff = heldContainer.getNextScheduleTime() - currentTs;
+                if (delayedContainer != null) {
+                  diff = delayedContainer.getNextScheduleTime() - currentTs;
                 }
                 if (diff > 0) {
                   this.wait(diff);
@@ -1526,11 +1543,9 @@ public class TaskScheduler extends AbstractService
         long nextScheduleTime) {
       HeldContainer delayedContainer = heldContainers.get(container.getId());
       if (delayedContainer == null) {
-        // Currently only already running containers are added to this. Hence
-        // this condition should never occur.
-        // Change this if and when all containers are handled by this construct.
-        throw new TezUncheckedException("Attempting to add a non-running"
-            + " container to the delayed container list");
+        LOG.warn("Attempting to add a non-running container to the"
+            + " delayed container list, containerId=" + container.getId());
+        return;
       } else {
         delayedContainer.setNextScheduleTime(nextScheduleTime);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8a8ba651/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index edea15b..b02e87e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -733,7 +733,7 @@ public class MRRSleepJob extends Configured implements Tool {
           " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
           " [-irt intermediateReduceSleepTime]" +
           " [-recordt recordSleepTime (msec)]" +
-          " [-generateSplitsInAM (fale)/true]" +
+          " [-generateSplitsInAM (false)/true]" +
           " [-writeSplitsToDfs (false)/true]");
       ToolRunner.printGenericCommandUsage(System.err);
       return 2;