You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/06/03 02:20:33 UTC

tez git commit: TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master cd068538d -> 631c3e9c6


TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/631c3e9c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/631c3e9c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/631c3e9c

Branch: refs/heads/master
Commit: 631c3e9c627603ac53426d5a0b120b426fb6712c
Parents: cd06853
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jun 2 17:20:20 2015 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jun 2 17:20:20 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../java/org/apache/tez/dag/app/AppContext.java |  4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  6 ++
 .../dag/app/rm/YarnTaskSchedulerService.java    | 34 ++++---
 .../tez/dag/app/rm/TestContainerReuse.java      | 95 ++++++++++++++++++--
 5 files changed, 120 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1780eac..b4c4967 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:
+  TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
   TEZ-2506. TestAysncHttpConnection failing.
   TEZ-2503. findbugs version isn't reported properly in test-patch report.
   TEZ-2198. Fix sorter spill counts.
@@ -31,6 +32,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
   TEZ-2527. Tez UI: Application hangs on entering erroneous RegEx in counter table search box
   TEZ-2523. Tez UI: derive applicationId from dag/vertex id instead of relying on json data
   TEZ-2505. PipelinedSorter uses Comparator objects concurrently from multiple threads.
@@ -229,6 +231,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2509. YarnTaskSchedulerService should not try to allocate containers if AM is shutting down.
   TEZ-2483. Tez should close task if processor fail
 
 Release 0.6.1: 2015-05-18

http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index c005447..e909d80 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -108,4 +108,8 @@ public interface AppContext {
   String[] getLocalDirs();
 
   String getAMUser();
+
+  /** Whether the AM is in the process of shutting down/completing */
+  boolean isAMInCompletionState();
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3805b6c..b2c77dc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1468,6 +1468,12 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public boolean isAMInCompletionState() {
+      return EnumSet.of(DAGAppMasterState.SUCCEEDED, DAGAppMasterState.KILLED, DAGAppMasterState.FAILED,
+          DAGAppMasterState.ERROR).contains(state);
+    }
+
+    @Override
     public Map<ApplicationAccessType, String> getApplicationACLs() {
       if (getServiceState() != STATE.STARTED) {
         throw new TezUncheckedException(

http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 44f5484..19902b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -527,14 +527,18 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private synchronized Map<CookieContainerRequest, Container>
       assignNewlyAllocatedContainers(Iterable<Container> containers) {
 
+    boolean amInCompletionState = appContext.isAMInCompletionState();
     Map<CookieContainerRequest, Container> assignedContainers =
         new HashMap<CookieContainerRequest, Container>();
-    assignNewContainersWithLocation(containers,
-      NODE_LOCAL_ASSIGNER, assignedContainers);
-    assignNewContainersWithLocation(containers,
-      RACK_LOCAL_ASSIGNER, assignedContainers);
-    assignNewContainersWithLocation(containers,
-      NON_LOCAL_ASSIGNER, assignedContainers);
+
+    if (!amInCompletionState) {
+      assignNewContainersWithLocation(containers,
+          NODE_LOCAL_ASSIGNER, assignedContainers);
+      assignNewContainersWithLocation(containers,
+          RACK_LOCAL_ASSIGNER, assignedContainers);
+      assignNewContainersWithLocation(containers,
+          NON_LOCAL_ASSIGNER, assignedContainers);
+    }
 
     // Release any unassigned containers given by the RM
     releaseUnassignedContainers(containers);
@@ -545,16 +549,19 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   private synchronized Map<CookieContainerRequest, Container>
       tryAssignReUsedContainers(Iterable<Container> containers) {
 
+    boolean amInCompletionState = appContext.isAMInCompletionState();
     Map<CookieContainerRequest, Container> assignedContainers =
       new HashMap<CookieContainerRequest, Container>();
 
     // Honor locality and match as many as possible
-    assignReUsedContainersWithLocation(containers,
-      NODE_LOCAL_ASSIGNER, assignedContainers, true);
-    assignReUsedContainersWithLocation(containers,
-      RACK_LOCAL_ASSIGNER, assignedContainers, true);
-    assignReUsedContainersWithLocation(containers,
-      NON_LOCAL_ASSIGNER, assignedContainers, true);
+    if (!amInCompletionState) {
+      assignReUsedContainersWithLocation(containers,
+          NODE_LOCAL_ASSIGNER, assignedContainers, true);
+      assignReUsedContainersWithLocation(containers,
+          RACK_LOCAL_ASSIGNER, assignedContainers, true);
+      assignReUsedContainersWithLocation(containers,
+          NON_LOCAL_ASSIGNER, assignedContainers, true);
+    }
 
     return assignedContainers;
   }
@@ -583,6 +590,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
       assignDelayedContainer(HeldContainer heldContainer) {
 
     DAGAppMasterState state = appContext.getAMState();
+
     boolean isNew = heldContainer.isNew();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Trying to assign a delayed container"
@@ -658,7 +666,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
             heldContainer.getContainer(), currentTime
                 + localitySchedulingDelay);        
       }
-   } else if (state.equals(DAGAppMasterState.RUNNING)) {
+    } else if (state.equals(DAGAppMasterState.RUNNING)) {
       // clear min held containers since we need to allocate to tasks
       if (!sessionMinHeldContainers.isEmpty()) {
         // update the expire time of min held containers so that they are

http://git-wip-us.apache.org/repos/asf/tez/blob/631c3e9c/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 89b77a7..79450a9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -355,8 +355,8 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(lrTa31);
 
     taskSchedulerEventHandler.handleEvent(
-      new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED));
+        new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
+            TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -846,9 +846,9 @@ public class TestContainerReuse {
     Configuration tezConf = new Configuration(new YarnConfiguration());
     tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
     tezConf.setLong(
-      TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
+        TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l);
     tezConf.setLong(
-      TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
+        TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 2000l);
     tezConf.setInt(
         TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1);
 
@@ -914,7 +914,7 @@ public class TestContainerReuse {
     TaskAttempt ta11 = mock(TaskAttempt.class);
     doReturn(vertexID1).when(ta11).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
-      taID11, ta11, resource1, host1, racks, priority1);
+        taID11, ta11, resource1, host1, racks, priority1);
 
     //Vertex2, Task1, Attempt 1, host1
     TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(
@@ -945,8 +945,8 @@ public class TestContainerReuse {
     // Task assigned to container completed successfully.
     // Container should  be assigned to task21.
     taskSchedulerEventHandler.handleEvent(
-      new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(),
+            TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(
@@ -955,8 +955,8 @@ public class TestContainerReuse {
 
     // Task 2 completes.
     taskSchedulerEventHandler.handleEvent(
-      new AMSchedulerEventTAEnded(ta21, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        new AMSchedulerEventTAEnded(ta21, container1.getId(),
+            TaskAttemptState.SUCCEEDED));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1294,6 +1294,83 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.close();
   }
 
+  @Test(timeout = 10000l)
+  public void testAssignmentOnShutdown()
+      throws IOException, InterruptedException, ExecutionException {
+    LOG.info("Test testAssignmentOnShutdown");
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+    String appUrl = "url";
+    String appMsg = "success";
+    AppFinalStatus finalStatus =
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+
+    doReturn(finalStatus).when(mockApp).getFinalAppStatus();
+
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new Configuration(false)).when(appContext).getAMConf();
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(true).when(appContext).isAMInCompletionState();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
+        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+            new AlwaysMatchesContainerMatcher());
+    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
+    taskSchedulerEventHandler.init(tezConf);
+    taskSchedulerEventHandler.start();
+
+    TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback)
+        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        .getSpyTaskScheduler();
+    TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    String[] host1 = {"host1"};
+    String[] host2 = {"host2"};
+
+    String []racks = {"/default-rack"};
+    Priority priority1 = Priority.newInstance(1);
+
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+    //Vertex 1, Task 1, Attempt 1, host1
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+    TaskAttempt ta11 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1,
+        host1, racks, priority1);
+    taskSchedulerEventHandler.handleEvent(lrEvent1);
+
+    Container container1 = createContainer(1, "host1", resource1, priority1);
+
+    // One container allocated.
+    drainNotifier.set(false);
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    drainableAppCallback.drain();
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta11),
+        any(Object.class), eq(container1));
+    taskScheduler.close();
+    taskSchedulerEventHandler.close();
+  }
 
   private Container createContainer(int id, String host, Resource resource, Priority priority) {
     ContainerId containerID = ContainerId.newInstance(