You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:58 UTC
[40/50] [abbrv] tez git commit: TEZ-2678. Fix comments from reviews -
part 1. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 8e8224a..0a02f9e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.app.rm;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -213,10 +216,10 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(lrTa31);
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(
- ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ new AMSchedulerEventTAEnded(
+ ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
verify(taskSchedulerEventHandler, times(1)).taskAllocated(
eq(0), eq(ta31), any(Object.class), eq(containerHost1));
verify(rmClient, times(0)).releaseAssignedContainer(
@@ -226,7 +229,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ TaskAttemptState.SUCCEEDED, null, null, 0));
long currentTs = System.currentTimeMillis();
Throwable exception = null;
@@ -332,16 +335,17 @@ public class TestContainerReuse {
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class), eq(containerHost2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class),
+ eq(containerHost2));
// Adding the event later so that task1 assigned to containerHost1 is deterministic.
taskSchedulerEventHandler.handleEvent(lrTa31);
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta21), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta21, true, null, null);
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
eq(0), eq(ta31), any(Object.class), eq(containerHost2));
verify(rmClient, times(1)).releaseAssignedContainer(
@@ -431,12 +435,15 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+ eq(container1));
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -444,19 +451,25 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta12), eq(true), eq((TaskAttemptEndReason)null));
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
+ verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class),
+ eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
// Verify no re-use if a previous task fails.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null,
+ "TIMEOUT", 0));
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container1));
- verify(taskScheduler).deallocateTask(eq(ta13), eq(false), eq((TaskAttemptEndReason)null));
+ verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+ eq(container1));
+ verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT");
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -468,12 +481,15 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class),
+ eq(container2));
// Task assigned to container completed successfully. No pending requests. Container should be released.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta14), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -570,13 +586,15 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+ eq(container1));
// First task had profiling on. This container can not be reused further.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
eq(container1));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
@@ -620,9 +638,11 @@ public class TestContainerReuse {
// Verify that the container can not be reused when profiling option is turned on
// Even for 2 tasks having same profiling option can have container reusability.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta13), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
eq(container2));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
@@ -662,12 +682,15 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container3));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class), eq(container3));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class),
+ eq(container3));
//Ensure task 6 (of vertex 1) is allocated to same container
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta15), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta15, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
eventHandler.reset();
@@ -769,10 +792,10 @@ public class TestContainerReuse {
// Container should not be immediately assigned to task 2
// until delay expires.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ new AMSchedulerEventTAEnded(ta11, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -787,7 +810,7 @@ public class TestContainerReuse {
// TA12 completed.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(3000l);
@@ -897,9 +920,9 @@ public class TestContainerReuse {
// Container should be assigned to task21.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta11), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(
eq(0), eq(ta21), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -907,7 +930,7 @@ public class TestContainerReuse {
// Task 2 completes.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, container1.getId(),
- TaskAttemptState.SUCCEEDED, null, 0));
+ TaskAttemptState.SUCCEEDED, null, null, 0));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1008,9 +1031,11 @@ public class TestContainerReuse {
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1020,9 +1045,11 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -1054,16 +1081,19 @@ public class TestContainerReuse {
// TODO This is terrible, need a better way to ensure the scheduling loop has run
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(6000l);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container1));
+ verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class),
+ eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
eventHandler.reset();
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta211), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1174,9 +1204,11 @@ public class TestContainerReuse {
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(
+ new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
+ null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta111), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1186,9 +1218,9 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, 0));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
- verify(taskScheduler).deallocateTask(eq(ta112), eq(true), eq((TaskAttemptEndReason)null));
+ verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
@@ -1301,6 +1333,7 @@ public class TestContainerReuse {
}
private Container createContainer(int id, String host, Resource resource, Priority priority) {
+ @SuppressWarnings("deprecation")
ContainerId containerID = ContainerId.newInstance(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
id);
@@ -1368,4 +1401,17 @@ public class TestContainerReuse {
return this.dagID;
}
}
+
+ private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded,
+ TaskAttemptEndReason endReason, String diagContains) {
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(taskScheduler)
+ .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ if (diagContains == null) {
+ assertNull(argumentCaptor.getValue());
+ } else {
+ assertTrue(argumentCaptor.getValue().contains(diagContains));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index c637f5f..3b2de34 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -93,7 +93,7 @@ public class TestLocalTaskSchedulerService {
Task task = mock(Task.class);
taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
- taskSchedulerService.deallocateTask(task, false, null);
+ taskSchedulerService.deallocateTask(task, false, null, null);
// start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
taskSchedulerService.startRequestHandlerThread();
@@ -128,7 +128,7 @@ public class TestLocalTaskSchedulerService {
MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
requestHandler.drainRequest(1);
- taskSchedulerService.deallocateTask(task, false, null);
+ taskSchedulerService.deallocateTask(task, false, null, null);
requestHandler.drainRequest(2);
assertEquals(1, requestHandler.deallocateCount);
assertEquals(1, requestHandler.allocateCount);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 6af9815..d956ff9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -193,7 +193,7 @@ public class TestTaskScheduler {
addContainerRequest((CookieContainerRequest) any());
// returned from task requests before allocation happens
- assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
@@ -201,7 +201,7 @@ public class TestTaskScheduler {
releaseAssignedContainer((ContainerId) any());
// deallocating unknown task
- assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
verify(mockApp, times(0)).containerBeingReleased(any(ContainerId.class));
verify(mockRMClient, times(1)).
removeContainerRequest((CookieContainerRequest) any());
@@ -346,7 +346,7 @@ public class TestTaskScheduler {
verify(mockRMClient).releaseAssignedContainer(mockCId4);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask1, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask1, true, null, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -466,7 +466,7 @@ public class TestTaskScheduler {
verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask4, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask4, true, null, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -496,7 +496,7 @@ public class TestTaskScheduler {
removeContainerRequest((CookieContainerRequest) any());
verify(mockRMClient, times(8)).addContainerRequest(
(CookieContainerRequest) any());
- assertFalse(scheduler.deallocateTask(mockTask1, true, null));
+ assertFalse(scheduler.deallocateTask(mockTask1, true, null, null));
List<NodeReport> mockUpdatedNodes = mock(List.class);
scheduler.onNodesUpdated(mockUpdatedNodes);
@@ -760,7 +760,7 @@ public class TestTaskScheduler {
verify(mockRMClient).releaseAssignedContainer(mockCId4);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask1, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask1, true, null, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId1);
verify(mockRMClient).releaseAssignedContainer(mockCId1);
@@ -890,7 +890,7 @@ public class TestTaskScheduler {
verify(mockApp, times(4)).taskAllocated(any(), any(), (Container) any());
verify(mockApp).taskAllocated(mockTask4, mockCookie4, mockContainer6);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask4, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask4, true, null, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId6);
verify(mockRMClient).releaseAssignedContainer(mockCId6);
@@ -979,8 +979,8 @@ public class TestTaskScheduler {
// container7 allocated to the task with affinity for it
verify(mockApp).taskAllocated(mockTask6, mockCookie6, mockContainer7);
// deallocate allocated task
- assertTrue(scheduler.deallocateTask(mockTask5, true, null));
- assertTrue(scheduler.deallocateTask(mockTask6, true, null));
+ assertTrue(scheduler.deallocateTask(mockTask5, true, null, null));
+ assertTrue(scheduler.deallocateTask(mockTask6, true, null, null));
drainableAppCallback.drain();
verify(mockApp).containerBeingReleased(mockCId7);
verify(mockApp).containerBeingReleased(mockCId8);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 3e68a4c..1550085 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -107,7 +107,7 @@ public class TestTaskSchedulerEventHandler {
class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
- AtomicBoolean notify = new AtomicBoolean(false);
+ final AtomicBoolean notify = new AtomicBoolean(false);
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
@@ -120,7 +120,7 @@ public class TestTaskSchedulerEventHandler {
protected void instantiateScheduelrs(String host, int port, String trackingUrl,
AppContext appContext) {
taskSchedulers[0] = mockTaskScheduler;
- taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService(taskSchedulers[0]);
+ taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
}
@Override
@@ -154,7 +154,6 @@ public class TestTaskSchedulerEventHandler {
mockWebUIService = mock(WebUIService.class);
when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
- Configuration conf = new Configuration(false);
schedulerHandler = new MockTaskSchedulerEventHandler(
mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
}
@@ -412,9 +411,8 @@ public class TestTaskSchedulerEventHandler {
@Test(timeout = 5000)
public void testNoSchedulerSpecified() throws IOException {
try {
- TSEHForMultipleSchedulersTest tseh =
- new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
- mockSigMatcher, mockWebUIService, null, false);
+ new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
+ mockSigMatcher, mockWebUIService, null, false);
fail("Expecting an IllegalStateException with no schedulers specified");
} catch (IllegalArgumentException e) {
}
@@ -686,7 +684,8 @@ public class TestTaskSchedulerEventHandler {
@Override
public boolean deallocateTask(Object task, boolean taskSucceeded,
- TaskAttemptEndReason endReason) {
+ TaskAttemptEndReason endReason,
+ String diagnostics) {
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f9952d8..13fa4c5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -127,7 +127,8 @@ public class TestAMContainer {
// Once for the previous NO_TASKS, one for the actual task.
verify(wc.chh).register(wc.containerID);
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+ eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority());
@@ -137,14 +138,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+ verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -178,21 +179,23 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+ eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
- assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
+ assertEquals(wc.taskAttemptID,
+ argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
wc.taskAttemptSucceeded(wc.taskAttemptID);
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+ verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -228,22 +231,25 @@ public class TestAMContainer {
// Once for the previous NO_TASKS, one for the actual task.
verify(wc.chh).register(wc.containerID);
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+ eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
- assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
+ assertEquals(wc.taskAttemptID,
+ argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
// Attempt succeeded
wc.taskAttemptSucceeded(wc.taskAttemptID);
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
+ verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taId2);
wc.verifyState(AMContainerState.RUNNING);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
+ verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID),
+ eq(0));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID());
@@ -252,14 +258,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER);
+ verifyUnregisterTaskAttempt(wc.tal, wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).unregister(wc.containerID);
assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -292,7 +298,8 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+ "received a STOP_REQUEST");
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -329,7 +336,8 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+ "received a STOP_REQUEST");
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -352,7 +360,8 @@ public class TestAMContainer {
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR,
+ "Multiple simultaneous taskAttempt");
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -390,7 +399,8 @@ public class TestAMContainer {
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR,
+ "Multiple simultaneous taskAttempt");
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -426,7 +436,8 @@ public class TestAMContainer {
wc.containerTimedOut();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+ "timed out");
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -460,7 +471,8 @@ public class TestAMContainer {
wc.stopRequest();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER,
+ "received a STOP_REQUEST");
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -494,7 +506,8 @@ public class TestAMContainer {
wc.launchFailed();
wc.verifyState(AMContainerState.STOPPING);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED,
+ "launchFailed");
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -544,7 +557,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -571,10 +584,10 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR, "DiskFailed");
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "DiskFailed");
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -602,10 +615,11 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
+ wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED, "NodeFailed");
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.NODE_FAILED,
+ "NodeFailed");
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -636,7 +650,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -665,7 +679,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.COMPLETED, null);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -697,10 +711,12 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
+ wc.containerCompleted(ContainerExitStatus.PREEMPTED,
+ TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, "Container preempted externally");
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0,
+ ContainerEndReason.EXTERNAL_PREEMPTION, "Container preempted externally");
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -734,11 +750,12 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
+ wc.containerCompleted(ContainerExitStatus.INVALID,
+ TaskAttemptTerminationCause.INTERNAL_PREEMPTION, "Container preempted internally");
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0,
- ContainerEndReason.INTERNAL_PREEMPTION);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0,
+ ContainerEndReason.INTERNAL_PREEMPTION, "Container preempted internally");
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -772,10 +789,11 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED,
+ TaskAttemptTerminationCause.NODE_DISK_ERROR, "NodeDiskError");
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
+ verifyUnregisterRunningContainer(wc.tal, wc.containerID, 0, ContainerEndReason.OTHER, "NodeDiskError");
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -1194,6 +1212,7 @@ public class TestAMContainer {
public AMContainerImpl amContainer;
+ @SuppressWarnings("deprecation") // ContainerId
public WrappedContainer(boolean shouldProfile, String profileString) {
applicationID = ApplicationId.newInstance(rmIdentifier, 1);
appAttemptID = ApplicationAttemptId.newInstance(applicationID, 1);
@@ -1286,7 +1305,8 @@ public class TestAMContainer {
Token<JobTokenIdentifier> jobToken = mock(Token.class);
TokenCache.setSessionToken(jobToken, credentials);
amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
- new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0, 0));
+ new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0,
+ 0));
}
public void assignTaskAttempt(TezTaskAttemptID taID) {
@@ -1333,10 +1353,12 @@ public class TestAMContainer {
amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null,
TaskAttemptTerminationCause.CONTAINER_EXITED));
}
-
- public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) {
+
+ public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause,
+ String diagnostics) {
reset(eventHandler);
- amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause));
+ amContainer.handle(
+ new AMContainerEventCompleted(containerID, exitStatus, diagnostics, errCause));
}
public void containerTimedOut() {
@@ -1417,4 +1439,33 @@ public class TestAMContainer {
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, 1, 1000000);
return lr;
}
+
+ private void verifyUnregisterRunningContainer(TaskAttemptListener tal, ContainerId containerId,
+ int taskCommId,
+ ContainerEndReason containerEndReason,
+ String diagContains) {
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(tal).unregisterRunningContainer(eq(containerId), eq(taskCommId), eq(containerEndReason),
+ argumentCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ if (diagContains != null) {
+ assertTrue(argumentCaptor.getValue().contains(diagContains));
+ } else {
+ assertNull(argumentCaptor.getValue());
+ }
+ }
+
+ private void verifyUnregisterTaskAttempt(TaskAttemptListener tal, TezTaskAttemptID taId,
+ int taskCommId, TaskAttemptEndReason endReason,
+ String diagContains) {
+ ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
+ verify(tal)
+ .unregisterTaskAttempt(eq(taId), eq(taskCommId), eq(endReason), argumentCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ if (diagContains != null) {
+ assertTrue(argumentCaptor.getValue().contains(diagContains));
+ } else {
+ assertNull(argumentCaptor.getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index 611e8cc..4883351 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -183,18 +183,26 @@ public class JoinValidate extends TezExampleBase {
}
}
+ // This is for internal use only, to use this example for external service testing.
+ // Not meant as documentation for the example.
protected VertexExecutionContext getDefaultExecutionContext() {
return null;
}
+ // This is for internal use only, to use this example for external service testing.
+ // Not meant as documentation for the example.
protected VertexExecutionContext getLhsExecutionContext() {
return null;
}
+ // This is for internal use only, to use this example for external service testing.
+ // Not meant as documentation for the example.
protected VertexExecutionContext getRhsExecutionContext() {
return null;
}
+ // This is for internal use only, to use this example for external service testing.
+ // Not meant as documentation for the example.
protected VertexExecutionContext getValidateExecutionContext() {
return null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index f95f4ca..5a1907f 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -81,11 +81,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 17f8a87..8b91dde 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -57,7 +57,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
private final ConcurrentMap<Object, ContainerId> runningTasks =
new ConcurrentHashMap<Object, ContainerId>();
- // AppIdIdentifier to avoid conflicts with other containres in the system.
+ // AppIdIdentifier to avoid conflicts with other containers in the system.
// Per instance
private final int memoryPerInstance;
@@ -181,7 +181,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
}
@Override
- public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason) {
+ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason, String diagnostics) {
ContainerId containerId = runningTasks.remove(task);
if (containerId == null) {
LOG.error("Could not determine ContainerId for task: " + task +
@@ -235,6 +235,7 @@ public class TezTestServiceTaskSchedulerService extends TaskScheduler {
.newInstance(appId, appAttemptId.getAttemptId());
}
+ @SuppressWarnings("deprecation")
public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, port);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index ef8f9e4..127967a 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -98,8 +98,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
@Override
- public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
- super.registerContainerEnd(containerId, endReason);
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
+ super.registerContainerEnd(containerId, endReason, diagnostics);
}
@Override
@@ -154,7 +154,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
String message = re.toString();
if (message.contains(RejectedExecutionException.class.getName())) {
getContext().taskKilled(taskSpec.getTaskAttemptID(),
- TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
+ TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
} else {
getContext()
.taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
@@ -175,8 +175,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
@Override
- public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
- super.unregisterRunningTaskAttempt(taskAttemptID, endReason);
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) {
+ super.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics);
// Nothing else to do for now. The push API in the test does not support termination of a running task
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
index 472a43c..3b4c768 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
@@ -454,7 +454,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun
try {
shouldDie = !taskRunner.run();
if (shouldDie) {
- LOG.info("Got a shouldDie notification via hearbeats. Shutting down");
+ LOG.info("Got a shouldDie notification via heartbeats. Shutting down");
return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
"Asked to die by the AM");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index d8539c5..7fd4c75 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -273,10 +273,10 @@ public class TezTaskRunner2 {
isFirstError = true;
killTaskRequested.set(true);
} else {
- logErrorIngored("killTask", null);
+ logErrorIgnored("killTask", null);
}
} else {
- logErrorIngored("killTask", null);
+ logErrorIgnored("killTask", null);
}
}
if (isFirstError) {
@@ -331,10 +331,10 @@ public class TezTaskRunner2 {
errorReporterToAm.set(true);
oobSignalErrorInProgress = true;
} else {
- logErrorIngored("signalFatalError", message);
+ logErrorIgnored("signalFatalError", message);
}
} else {
- logErrorIngored("signalFatalError", message);
+ logErrorIgnored("signalFatalError", message);
}
}
@@ -394,14 +394,14 @@ public class TezTaskRunner2 {
registerFirstException(t, null);
isFirstError = true;
} else {
- logErrorIngored("umbilicalFatalError", null);
+ logErrorIgnored("umbilicalFatalError", null);
}
// A race is possible between a task succeeding, and a subsequent timed heartbeat failing.
// These errors can be ignored, since a task can only succeed if the synchronous taskSucceeded
// method does not throw an exception, in which case task success is registered with the AM.
// Leave subsequent heartbeat errors to the next entity to communicate using the TaskReporter
} else {
- logErrorIngored("umbilicalFatalError", null);
+ logErrorIgnored("umbilicalFatalError", null);
}
// Since this error came from the taskReporter - there's no point attempting to report a failure back to it.
// However, the task does need to be cleaned up
@@ -425,7 +425,7 @@ public class TezTaskRunner2 {
logAborting("shutdownRequested");
killTaskInternal();
} else {
- logErrorIngored("shutdownRequested", null);
+ logErrorIgnored("shutdownRequested", null);
}
}
}
@@ -474,7 +474,7 @@ public class TezTaskRunner2 {
(successReportAttempted ? "success" : "failure/killed"), t);
}
- private void logErrorIngored(String ignoredEndReason, String errorMessage) {
+ private void logErrorIgnored(String ignoredEndReason, String errorMessage) {
LOG.info(
"Ignoring {} request since the task with id {} has ended for reason: {}. IgnoredError: {} ",
ignoredEndReason, task.getTaskAttemptID(),
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
index fc42da3..7502c41 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -437,6 +437,7 @@ public class TaskExecutionTestHelpers {
}
}
+ @SuppressWarnings("deprecation")
public static ContainerId createContainerId(ApplicationId appId) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
http://git-wip-us.apache.org/repos/asf/tez/blob/fee059b9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
index c1616af..c3c4705 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
@@ -40,6 +40,7 @@ public class TestContainerExecution {
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+ @SuppressWarnings("deprecation")
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest