You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:58 UTC

[40/50] [abbrv] tez git commit: TEZ-2678. Fix comments from reviews - part 1. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 8e8224a..0a02f9e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.dag.app.rm;
 
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -213,10 +216,10 @@ public class TestContainerReuse {
     taskSchedulerEventHandler.handleEvent(lrTa31);
 
     taskSchedulerEventHandler.handleEvent(
-      new AMSchedulerEventTAEnded(
-        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+        new AMSchedulerEventTAEnded(
+            ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
     verify(taskSchedulerEventHandler, times(1)).taskAllocated(
       eq(0), eq(ta31), any(Object.class), eq(containerHost1));
     verify(rmClient, times(0)).releaseAssignedContainer(
@@ -226,7 +229,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED, null, 0));
+        TaskAttemptState.SUCCEEDED, null, null, 0));
 
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
@@ -332,16 +335,17 @@ public class TestContainerReuse {
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class),
+        eq(containerHost2));
 
     // Adding the event later so that task1 assigned to containerHost1 is deterministic.
     taskSchedulerEventHandler.handleEvent(lrTa31);
 
     taskSchedulerEventHandler.handleEvent(
         new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-            TaskAttemptState.SUCCEEDED, null, 0));
+            TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta21, true, null, null);
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
         eq(0), eq(ta31), any(Object.class), eq(containerHost2));
     verify(rmClient, times(1)).releaseAssignedContainer(
@@ -431,12 +435,15 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+        eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -444,19 +451,25 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
+    verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class),
+        eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
     // Verify no re-use if a previous task fails.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null,
+            "TIMEOUT", 0));
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1));
-    verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
+    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+        eq(container1));
+    verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT");
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -468,12 +481,15 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class),
+        eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
     verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -570,13 +586,15 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+        eq(container1));
 
     // First task had profiling on. This container can not be reused further.
     taskSchedulerEventHandler.handleEvent(
-        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
         eq(container1));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
@@ -620,9 +638,11 @@ public class TestContainerReuse {
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
         eq(container2));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
@@ -662,12 +682,15 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container3));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class),
+        eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta15, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
     eventHandler.reset();
 
@@ -769,10 +792,10 @@ public class TestContainerReuse {
     // Container should not be immediately assigned to task 2
     // until delay expires.
     taskSchedulerEventHandler.handleEvent(
-      new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED, null, 0));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(),
+            TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
         eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -787,7 +810,7 @@ public class TestContainerReuse {
     // TA12 completed.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
-        TaskAttemptState.SUCCEEDED, null, 0));
+        TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
@@ -897,9 +920,9 @@ public class TestContainerReuse {
     // Container should  be assigned to task21.
     taskSchedulerEventHandler.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(),
-            TaskAttemptState.SUCCEEDED, null, 0));
+            TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(
         eq(0), eq(ta21), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -907,7 +930,7 @@ public class TestContainerReuse {
     // Task 2 completes.
     taskSchedulerEventHandler.handleEvent(
         new AMSchedulerEventTAEnded(ta21, container1.getId(),
-            TaskAttemptState.SUCCEEDED, null, 0));
+            TaskAttemptState.SUCCEEDED, null, null, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1008,9 +1031,11 @@ public class TestContainerReuse {
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1020,9 +1045,11 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -1054,16 +1081,19 @@ public class TestContainerReuse {
     // TODO This is terrible, need a better way to ensure the scheduling loop has run
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(6000l);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1));
+    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class),
+        eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1174,9 +1204,11 @@ public class TestContainerReuse {
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(
+        new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+            null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
     verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1186,9 +1218,9 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
-    verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
+    verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
@@ -1301,6 +1333,7 @@ public class TestContainerReuse {
   }
 
   private Container createContainer(int id, String host, Resource resource, Priority priority) {
+    @SuppressWarnings("deprecation")
     ContainerId containerID = ContainerId.newInstance(
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
         id);
@@ -1368,4 +1401,17 @@ public class TestContainerReuse {
       return this.dagID;
     }
   }
+
+  private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded,
+                                    TaskAttemptEndReason endReason, String diagContains) {
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    verify(taskScheduler)
+        .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture());
+    assertEquals(1, argumentCaptor.getAllValues().size());
+    if (diagContains == null) {
+      assertNull(argumentCaptor.getValue());
+    } else {
+      assertTrue(argumentCaptor.getValue().contains(diagContains));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index c637f5f..3b2de34 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -93,7 +93,7 @@ public class TestLocalTaskSchedulerService {
 
     Task task = mock(Task.class);
     taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
-    taskSchedulerService.deallocateTask(task, false, null);
+    taskSchedulerService.deallocateTask(task, false, null, null);
     // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
     taskSchedulerService.startRequestHandlerThread();
 
@@ -128,7 +128,7 @@ public class TestLocalTaskSchedulerService {
 
     MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
     requestHandler.drainRequest(1);
-    taskSchedulerService.deallocateTask(task, false, null);
+    taskSchedulerService.deallocateTask(task, false, null, null);
     requestHandler.drainRequest(2);
     assertEquals(1, requestHandler.deallocateCount);
     assertEquals(1, requestHandler.allocateCount);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 6af9815..d956ff9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -193,7 +193,7 @@ public class TestTaskScheduler {
                            addContainerRequest((CookieContainerRequest) any());
 
     // returned from task requests before allocation happens
-    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
     verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
     verify(mockRMClient, times(1)).
                         removeContainerRequest((CookieContainerRequest) any());
@@ -201,7 +201,7 @@ public class TestTaskScheduler {
                                  releaseAssignedContainer((ContainerId) any());
 
     // deallocating unknown task
-    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
     verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
     verify(mockRMClient, times(1)).
                         removeContainerRequest((CookieContainerRequest) any());
@@ -346,7 +346,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).releaseAssignedContainer(mockCId4);
 
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask1, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask1, true, null, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId1);
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -466,7 +466,7 @@ public class TestTaskScheduler {
     verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
     verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask4, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask4, true, null, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -496,7 +496,7 @@ public class TestTaskScheduler {
         removeContainerRequest((CookieContainerRequest) any());
     verify(mockRMClient, times(8)).addContainerRequest(
         (CookieContainerRequest) any());
-    assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+    assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
 
     List<NodeReport> mockUpdatedNodes = mock(List.class);
     scheduler.onNodesUpdated(mockUpdatedNodes);
@@ -760,7 +760,7 @@ public class TestTaskScheduler {
     verify(mockRMClient).releaseAssignedContainer(mockCId4);
 
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask1, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask1, true, null, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId1);
     verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -890,7 +890,7 @@ public class TestTaskScheduler {
     verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
     verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask4, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask4, true, null, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId6);
     verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -979,8 +979,8 @@ public class TestTaskScheduler {
     // container7 allocated to the task with affinity for it
     verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
     // deallocate allocated task
-    assertTrue(scheduler.deallocateTask(mockTask5, true, null));
-    assertTrue(scheduler.deallocateTask(mockTask6, true, null));
+    assertTrue(scheduler.deallocateTask(mockTask5, true, null, null));
+    assertTrue(scheduler.deallocateTask(mockTask6, true, null, null));
     drainableAppCallback.drain();
     verify(mockApp).containerBeingReleased(mockCId7);
     verify(mockApp).containerBeingReleased(mockCId8);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 3e68a4c..1550085 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -107,7 +107,7 @@ public class TestTaskSchedulerEventHandler {
   
   class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
 
-    AtomicBoolean notify = new AtomicBoolean(false);
+    final AtomicBoolean notify = new AtomicBoolean(false);
     
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
@@ -120,7 +120,7 @@ public class TestTaskSchedulerEventHandler {
     protected void instantiateScheduelrs(String host, int port, String trackingUrl,
                                          AppContext appContext) {
       taskSchedulers[0] = mockTaskScheduler;
-      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
+      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
     }
     
     @Override
@@ -154,7 +154,6 @@ public class TestTaskSchedulerEventHandler {
     mockWebUIService = mock(WebUIService.class);
     when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
     when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
-    Configuration conf = new Configuration(false);
     schedulerHandler = new MockTaskSchedulerEventHandler(
         mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
   }
@@ -412,9 +411,8 @@ public class TestTaskSchedulerEventHandler {
   @Test(timeout = 5000)
   public void testNoSchedulerSpecified() throws IOException {
     try {
-      TSEHForMultipleSchedulersTest tseh =
-          new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
-              mockSigMatcher, mockWebUIService, null, false);
+      new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
+          mockSigMatcher, mockWebUIService, null, false);
       fail("Expecting an IllegalStateException with no schedulers specified");
     } catch (IllegalArgumentException e) {
     }
@@ -686,7 +684,8 @@ public class TestTaskSchedulerEventHandler {
 
     @Override
     public boolean deallocateTask(Object task, boolean taskSucceeded,
-                                  TaskAttemptEndReason endReason) {
+                                  TaskAttemptEndReason endReason,
+                                  String diagnostics) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f9952d8..13fa4c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -127,7 +127,8 @@ public class TestAMContainer {
     // Once for the previous NO_TASKS, one for the actual task.
     verify(wc.chh).register(wc.containerID);
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+        eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
     assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
     assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority());
@@ -137,14 +138,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+    verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -178,21 +179,23 @@ public class TestAMContainer {
     wc.verifyNoOutgoingEvents();
     assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+        eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
-    assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
+    assertEquals(wc.taskAttemptID,
+        argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
 
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+    verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
 
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -228,22 +231,25 @@ public class TestAMContainer {
     // Once for the previous NO_TASKS, one for the actual task.
     verify(wc.chh).register(wc.containerID);
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+        eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
-    assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
+    assertEquals(wc.taskAttemptID,
+        argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
 
     // Attempt succeeded
     wc.taskAttemptSucceeded(wc.taskAttemptID);
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+    verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
 
     TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taId2);
     wc.verifyState(AMContainerState.RUNNING);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+        eq(0));
     assertEquals(2, argumentCaptor.getAllValues().size());
     assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID());
 
@@ -252,14 +258,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER);
+    verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -292,7 +298,8 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+        "received a STOP_REQUEST");
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -329,7 +336,8 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+        "received a STOP_REQUEST");
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -352,7 +360,8 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR,
+        "Multiple simultaneous taskAttempt");
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -390,7 +399,8 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR,
+        "Multiple simultaneous taskAttempt");
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -426,7 +436,8 @@ public class TestAMContainer {
 
     wc.containerTimedOut();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+        "timed out");
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -460,7 +471,8 @@ public class TestAMContainer {
 
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+        "received a STOP_REQUEST");
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -494,7 +506,8 @@ public class TestAMContainer {
     wc.launchFailed();
     wc.verifyState(AMContainerState.STOPPING);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED,
+        "launchFailed");
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -544,7 +557,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -571,10 +584,10 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR, "DiskFailed");
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed");
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -602,10 +615,11 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
+    wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED, "NodeFailed");
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED,
+        "NodeFailed");
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -636,7 +650,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -665,7 +679,7 @@ public class TestAMContainer {
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -697,10 +711,12 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
+    wc.containerCompleted(ContainerExitStatus.PREEMPTED,
+        TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "Container preempted externally");
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0,
+        ContainerEndReason.EXTERNAL_PREEMPTION, "Container preempted externally");
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -734,11 +750,12 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
+    wc.containerCompleted(ContainerExitStatus.INVALID,
+        TaskAttemptTerminationCause.INTERNAL_PREEMPTION, "Container preempted internally");
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0,
-        ContainerEndReason.INTERNAL_PREEMPTION);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0,
+        ContainerEndReason.INTERNAL_PREEMPTION, "Container preempted internally");
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -772,10 +789,11 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+    wc.containerCompleted(ContainerExitStatus.DISKS_FAILED,
+        TaskAttemptTerminationCause.NODE_DISK_ERROR, "NodeDiskError");
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID, 0);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+    verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "NodeDiskError");
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -1194,6 +1212,7 @@ public class TestAMContainer {
 
     public AMContainerImpl amContainer;
 
+    @SuppressWarnings("deprecation") // ContainerId
     public WrappedContainer(boolean shouldProfile, String profileString) {
       applicationID = ApplicationId.newInstance(rmIdentifier, 1);
       appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
@@ -1286,7 +1305,8 @@ public class TestAMContainer {
       Token<JobTokenIdentifier> jobToken = mock(Token.class);
       TokenCache.setSessionToken(jobToken, credentials);
       amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
-          new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0, 0));
+          new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0,
+          0));
     }
 
     public void assignTaskAttempt(TezTaskAttemptID taID) {
@@ -1333,10 +1353,12 @@ public class TestAMContainer {
       amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null,
           TaskAttemptTerminationCause.CONTAINER_EXITED));
     }
-    
-    public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) {
+
+    public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause,
+                                   String diagnostics) {
       reset(eventHandler);
-      amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause));
+      amContainer.handle(
+          new AMContainerEventCompleted(containerID, exitStatus, diagnostics, errCause));
     }
 
     public void containerTimedOut() {
@@ -1417,4 +1439,33 @@ public class TestAMContainer {
         LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1, 1000000);
     return lr;
   }
+
+  private void verifyUnregisterRunningContainer(TaskAttemptListener tal, ContainerId containerId,
+                                                int taskCommId,
+                                                ContainerEndReason containerEndReason,
+                                                String diagContains) {
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    verify(tal).unregisterRunningContainer(eq(containerId), eq(taskCommId), eq(containerEndReason),
+        argumentCaptor.capture());
+    assertEquals(1, argumentCaptor.getAllValues().size());
+    if (diagContains != null) {
+      assertTrue(argumentCaptor.getValue().contains(diagContains));
+    } else {
+      assertNull(argumentCaptor.getValue());
+    }
+  }
+
+  private void verifyUnregisterTaskAttempt(TaskAttemptListener tal, TezTaskAttemptID taId,
+                                           int taskCommId, TaskAttemptEndReason endReason,
+                                           String diagContains) {
+    ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+    verify(tal)
+        .unregisterTaskAttempt(eq(taId), eq(taskCommId), eq(endReason), argumentCaptor.capture());
+    assertEquals(1, argumentCaptor.getAllValues().size());
+    if (diagContains != null) {
+      assertTrue(argumentCaptor.getValue().contains(diagContains));
+    } else {
+      assertNull(argumentCaptor.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 611e8cc..4883351 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -183,18 +183,26 @@ public class JoinValidate extends TezExampleBase {
     }
   }
 
+  // This is for internal use only, to use this example for external service testing.
+  // Not meant as documentation for the example.
   protected VertexExecutionContext getDefaultExecutionContext() {
     return null;
   }
 
+  // This is for internal use only, to use this example for external service testing.
+  // Not meant as documentation for the example.
   protected VertexExecutionContext getLhsExecutionContext() {
     return null;
   }
 
+  // This is for internal use only, to use this example for external service testing.
+  // Not meant as documentation for the example.
   protected VertexExecutionContext getRhsExecutionContext() {
     return null;
   }
 
+  // This is for internal use only, to use this example for external service testing.
+  // Not meant as documentation for the example.
   protected VertexExecutionContext getValidateExecutionContext() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index f95f4ca..5a1907f 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -81,11 +81,6 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 17f8a87..8b91dde 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -57,7 +57,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
   private final ConcurrentMap<Object, ContainerId> runningTasks =
       new ConcurrentHashMap<Object, ContainerId>();
 
-  // AppIdIdentifier to avoid conflicts with other containres in the system.
+  // AppIdIdentifier to avoid conflicts with other containers in the system.
 
   // Per instance
   private final int memoryPerInstance;
@@ -181,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
   }
 
   @Override
-  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
+  public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
     ContainerId containerId = runningTasks.remove(task);
     if (containerId == null) {
       LOG.error("Could not determine ContainerId for task: " + task +
@@ -235,6 +235,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
           .newInstance(appId, appAttemptId.getAttemptId());
     }
 
+    @SuppressWarnings("deprecation")
     public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
       ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance(hostname, port);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index ef8f9e4..127967a 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -98,8 +98,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
-    super.registerContainerEnd(containerId, endReason);
+  public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
+    super.registerContainerEnd(containerId, endReason, diagnostics);
   }
 
   @Override
@@ -154,7 +154,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
               String message = re.toString();
               if (message.contains(RejectedExecutionException.class.getName())) {
                 getContext().taskKilled(taskSpec.getTaskAttemptID(),
-                    TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
+                    TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
               } else {
                 getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
@@ -175,8 +175,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
-    super.unregisterRunningTaskAttempt(taskAttemptID, endReason);
+  public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) {
+    super.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics);
     // Nothing else to do for now. The push API in the test does not support termination of a running task
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 472a43c..3b4c768 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -454,7 +454,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
       try {
         shouldDie = !taskRunner.run();
         if (shouldDie) {
-          LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+          LOG.info("Got a shouldDie notification via heartbeats. Shutting down");
           return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
               "Asked to die by the AM");
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index d8539c5..7fd4c75 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -273,10 +273,10 @@ public class TezTaskRunner2 {
           isFirstError = true;
           killTaskRequested.set(true);
         } else {
-          logErrorIngored("killTask", null);
+          logErrorIgnored("killTask", null);
         }
       } else {
-        logErrorIngored("killTask", null);
+        logErrorIgnored("killTask", null);
       }
     }
     if (isFirstError) {
@@ -331,10 +331,10 @@ public class TezTaskRunner2 {
             errorReporterToAm.set(true);
             oobSignalErrorInProgress = true;
           } else {
-            logErrorIngored("signalFatalError", message);
+            logErrorIgnored("signalFatalError", message);
           }
         } else {
-          logErrorIngored("signalFatalError", message);
+          logErrorIgnored("signalFatalError", message);
         }
       }
 
@@ -394,14 +394,14 @@ public class TezTaskRunner2 {
             registerFirstException(t, null);
             isFirstError = true;
           } else {
-            logErrorIngored("umbilicalFatalError", null);
+            logErrorIgnored("umbilicalFatalError", null);
           }
           // A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
           // These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
           // method does not throw an exception, in which case task success is registered with the AM.
           // Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
         } else {
-          logErrorIngored("umbilicalFatalError", null);
+          logErrorIgnored("umbilicalFatalError", null);
         }
         // Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
         // However, the task does need to be cleaned up
@@ -425,7 +425,7 @@ public class TezTaskRunner2 {
         logAborting("shutdownRequested");
         killTaskInternal();
       } else {
-        logErrorIngored("shutdownRequested", null);
+        logErrorIgnored("shutdownRequested", null);
       }
     }
   }
@@ -474,7 +474,7 @@ public class TezTaskRunner2 {
         (successReportAttempted ? "success" : "failure/killed"), t);
   }
 
-  private void logErrorIngored(String ignoredEndReason, String errorMessage) {
+  private void logErrorIgnored(String ignoredEndReason, String errorMessage) {
     LOG.info(
         "Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ",
         ignoredEndReason, task.getTaskAttemptID(),

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
index fc42da3..7502c41 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -437,6 +437,7 @@ public class TaskExecutionTestHelpers {
     }
   }
 
+  @SuppressWarnings("deprecation")
   public static ContainerId createContainerId(ApplicationId appId) {
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);

http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
index c1616af..c3c4705 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
@@ -40,6 +40,7 @@ public class TestContainerExecution {
       executor = MoreExecutors.listeningDecorator(rawExecutor);
       ApplicationId appId = ApplicationId.newInstance(10000, 1);
       ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      @SuppressWarnings("deprecation")
       ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
 
       TaskExecutionTestHelpers.TezTaskUmbilicalForTest