You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/02/19 23:05:46 UTC

tez git commit: TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master de3a0748f -> 941d1990f


TEZ-3123. Containers can get re-used even with conflicting local resources. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/941d1990
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/941d1990
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/941d1990

Branch: refs/heads/master
Commit: 941d1990f68e8a98b4ab2cd25688a117e29dd697
Parents: de3a074
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Feb 19 14:05:33 2016 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Feb 19 14:05:33 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../dag/app/rm/YarnTaskSchedulerService.java    |  7 ++-
 .../tez/dag/app/rm/TestContainerReuse.java      | 64 ++++++++++++++++++--
 3 files changed, 66 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d10b47a..48eca4d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-3029. Add an onError method to service plugin contexts.
 
 ALL CHANGES:
+  TEZ-3123. Containers can get re-used even with conflicting local resources.
   TEZ-3117. Deadlock in Edge and Vertex code
   TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).
@@ -335,6 +336,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES
+  TEZ-3123. Containers can get re-used even with conflicting local resources.
   TEZ-3117. Deadlock in Edge and Vertex code
   TEZ-3103. Shuffle can hang when memory to memory merging enabled
   TEZ-3107. tez-tools: Log warn msgs in case ATS has wrong values (e.g startTime > finishTime).

http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index c1c363b..bd4ac2f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -2337,11 +2337,12 @@ public class YarnTaskSchedulerService extends TaskScheduler
       // Merge the container signatures to account for any changes to the container
       // footprint. For example, re-localization of additional resources will
       // cause the held container's signature to change.
-      lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
-      if (lastTaskInfo != null && lastTaskInfo.getCookie().getContainerSignature() != null) {
+      if (lastAssignedContainerSignature != null) {
         lastAssignedContainerSignature = signatureMatcher.union(
-            lastTaskInfo.getCookie().getContainerSignature(),
+            lastAssignedContainerSignature,
             taskInfo.getCookie().getContainerSignature());
+      } else {
+        lastAssignedContainerSignature = taskInfo.getCookie().getContainerSignature();
       }
       lastTaskInfo = taskInfo;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/941d1990/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 78dc8fd..99c85ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1173,7 +1173,8 @@ public class TestContainerReuse {
     TaskAttempt ta111 = mock(TaskAttempt.class);
     doReturn(taID111).when(ta111).getID();
     doReturn("Mock for TA " + taID111.toString()).when(ta111).toString();
-    AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, v11LR);
+    AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
+        taID111, ta111, resource1, host1, racks, priority1, v11LR);
 
     Map<String, LocalResource> v12LR = Maps.newHashMap();
     v12LR.put(rsrc1, lr1);
@@ -1184,7 +1185,24 @@ public class TestContainerReuse {
     TaskAttempt ta112 = mock(TaskAttempt.class);
     doReturn(taID112).when(ta112).getID();
     doReturn("Mock for TA " + taID112.toString()).when(ta112).toString();
-    AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR);
+    AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(
+        taID112, ta112, resource1, host1, racks, priority1, v12LR);
+
+    //Vertex 1, Task 3, Attempt 1, host1
+    TezTaskAttemptID taID113 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 3), 1);
+    TaskAttempt ta113 = mock(TaskAttempt.class);
+    doReturn(taID113).when(ta113).getID();
+    doReturn("Mock for TA " + taID113.toString()).when(ta113).toString();
+    AMSchedulerEventTALaunchRequest lrEvent13 = createLaunchRequestEvent(
+        taID113, ta113, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
+    //Vertex 1, Task 4, Attempt 1, host1
+    TezTaskAttemptID taID114 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 4), 1);
+    TaskAttempt ta114 = mock(TaskAttempt.class);
+    doReturn(taID114).when(ta114).getID();
+    doReturn("Mock for TA " + taID114.toString()).when(ta114).toString();
+    AMSchedulerEventTALaunchRequest lrEvent14 = createLaunchRequestEvent(
+        taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>());
+
 
     drainNotifier.set(false);
     taskSchedulerManager.handleEvent(lrEvent11);
@@ -1220,14 +1238,52 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
-    // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused.
+    // Task 3
+    drainNotifier.set(false);
+    taskSchedulerManager.handleEvent(lrEvent13);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta113, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta113, true, null, null);
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // Task 4
+    drainNotifier.set(false);
+    taskSchedulerManager.handleEvent(lrEvent14);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta114, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta114, true, null, null);
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+
+    // Setup DAG2 with different resources.
     TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0);
     dagIDAnswer.setDAGID(dagID2);