You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:43:03 UTC
[25/43] tez git commit: TEZ-2123. Fix component managers to use
pluggable components. Enable hybrid mode. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 54b9adb..c1169ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -223,7 +223,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(
- ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED));
+ ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler, times(1)).taskAllocated(
@@ -235,7 +235,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
long currentTs = System.currentTimeMillis();
Throwable exception = null;
@@ -356,7 +356,7 @@ public class TestContainerReuse {
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -459,7 +459,7 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
@@ -469,7 +469,7 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta12), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
@@ -478,7 +478,7 @@ public class TestContainerReuse {
eventHandler.reset();
// Verify no re-use if a previous task fails.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0));
drainableAppCallback.drain();
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
verify(taskScheduler).deallocateTask(eq(ta13), eq(false));
@@ -496,7 +496,7 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
// Task assigned to container completed successfully. No pending requests. Container should be released.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
@@ -607,7 +607,7 @@ public class TestContainerReuse {
// First task had profiling on. This container can not be reused further.
taskSchedulerEventHandler.handleEvent(
- new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+ new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
@@ -653,7 +653,7 @@ public class TestContainerReuse {
// Verify that the container can not be reused when profiling option is turned on
// Even for 2 tasks having same profiling option can have container reusability.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
@@ -698,7 +698,7 @@ public class TestContainerReuse {
verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
//Ensure task 6 (of vertex 1) is allocated to same container
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
@@ -811,7 +811,7 @@ public class TestContainerReuse {
// until delay expires.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -828,7 +828,7 @@ public class TestContainerReuse {
// TA12 completed.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(3000l);
@@ -946,7 +946,7 @@ public class TestContainerReuse {
// Container should be assigned to task21.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(
@@ -956,7 +956,7 @@ public class TestContainerReuse {
// Task 2 completes.
taskSchedulerEventHandler.handleEvent(
new AMSchedulerEventTAEnded(ta21, container1.getId(),
- TaskAttemptState.SUCCEEDED));
+ TaskAttemptState.SUCCEEDED, 0));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1065,7 +1065,7 @@ public class TestContainerReuse {
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
@@ -1077,7 +1077,7 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1118,7 +1118,7 @@ public class TestContainerReuse {
assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
eventHandler.reset();
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED));
+ taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
drainableAppCallback.drain();
verify(taskScheduler).deallocateTask(eq(ta211), eq(true));
verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 60782e6..12390b2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -59,7 +59,7 @@ public class TestLocalTaskScheduler {
TezConfiguration tezConf = new TezConfiguration();
tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS);
- LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext());
+ LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000);
HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>();
PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 3cf4f6c..25cf4b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -81,8 +83,12 @@ public class TestLocalTaskSchedulerService {
*/
@Test(timeout = 5000)
public void testDeallocationBeforeAllocation() {
+ AppContext appContext = mock(AppContext.class);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
- (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+ (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
taskSchedulerService.init(new Configuration());
taskSchedulerService.start();
@@ -105,8 +111,12 @@ public class TestLocalTaskSchedulerService {
*/
@Test(timeout = 5000)
public void testDeallocationAfterAllocation() {
+ AppContext appContext = mock(AppContext.class);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
+ doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
- (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+ (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
taskSchedulerService.init(new Configuration());
taskSchedulerService.start();
@@ -132,13 +142,13 @@ public class TestLocalTaskSchedulerService {
String appHostName, int appHostPort, String appTrackingUrl,
AppContext appContext) {
super(appClient, containerSignatureMatcher, appHostName, appHostPort,
- appTrackingUrl, appContext);
+ appTrackingUrl, 10000l, appContext);
}
@Override
public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
- new LocalContainerFactory(appContext),
+ new LocalContainerFactory(appContext, customContainerAppId),
taskAllocations,
appClientDelegate,
conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 291e786..4ee05cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -89,7 +89,7 @@ public class TestTaskSchedulerEventHandler {
public MockTaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {});
+ super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false);
}
@Override
@@ -162,7 +162,7 @@ public class TestTaskSchedulerEventHandler {
AMSchedulerEventTALaunchRequest lr =
new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
- priority, containerContext);
+ priority, containerContext, 0, 0, 0);
schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
assertEquals(2, mockEventHandler.events.size());
assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
@@ -249,9 +249,14 @@ public class TestTaskSchedulerEventHandler {
Configuration conf = new Configuration(false);
schedulerHandler.init(conf);
schedulerHandler.start();
-
+
+ AMContainer mockAmContainer = mock(AMContainer.class);
+ when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0);
+ when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0);
+ when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0);
ContainerId mockCId = mock(ContainerId.class);
verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
+ when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
schedulerHandler.preemptContainer(mockCId);
verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
assertEquals(1, mockEventHandler.events.size());
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index d775300..ffab769 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers {
EventHandler eventHandler,
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
ContainerSignatureMatcher containerSignatureMatcher) {
- super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{});
+ super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false);
this.amrmClientAsync = amrmClientAsync;
this.containerSignatureMatcher = containerSignatureMatcher;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fafbba6..bdd0f61 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
@@ -104,7 +105,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.LAUNCHING);
// 1 Launch request.
wc.verifyCountAndGetOutgoingEvents(1);
- verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
assertNull(wc.amContainer.getCurrentTaskAttempt());
// Assign task.
@@ -121,7 +122,7 @@ public class TestAMContainer {
// Once for the previous NO_TASKS, one for the actual task.
verify(wc.chh).register(wc.containerID);
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority());
@@ -131,14 +132,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -157,7 +158,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.LAUNCHING);
// 1 Launch request.
wc.verifyCountAndGetOutgoingEvents(1);
- verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
// Container Launched
wc.containerLaunched();
@@ -172,7 +173,7 @@ public class TestAMContainer {
wc.verifyNoOutgoingEvents();
assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
@@ -180,13 +181,13 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -205,7 +206,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.LAUNCHING);
// 1 Launch request.
wc.verifyCountAndGetOutgoingEvents(1);
- verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
assertNull(wc.amContainer.getCurrentTaskAttempt());
// Assign task.
@@ -222,7 +223,7 @@ public class TestAMContainer {
// Once for the previous NO_TASKS, one for the actual task.
verify(wc.chh).register(wc.containerID);
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
assertEquals(1, argumentCaptor.getAllValues().size());
assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
@@ -231,13 +232,13 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taId2);
wc.verifyState(AMContainerState.RUNNING);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
assertEquals(2, argumentCaptor.getAllValues().size());
assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID());
@@ -246,14 +247,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(taId2);
+ verify(wc.tal).unregisterTaskAttempt(taId2, 0);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -286,7 +287,7 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -323,7 +324,7 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -346,7 +347,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -378,13 +379,13 @@ public class TestAMContainer {
wc.launchContainer();
wc.assignTaskAttempt(wc.taskAttemptID);
wc.verifyState(AMContainerState.LAUNCHING);
- verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -420,7 +421,7 @@ public class TestAMContainer {
wc.containerTimedOut();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -454,7 +455,7 @@ public class TestAMContainer {
wc.stopRequest();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -484,11 +485,11 @@ public class TestAMContainer {
wc.launchContainer();
wc.assignTaskAttempt(wc.taskAttemptID);
wc.verifyState(AMContainerState.LAUNCHING);
- verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
wc.launchFailed();
wc.verifyState(AMContainerState.STOPPING);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -537,8 +538,8 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -567,8 +568,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -598,8 +599,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -629,8 +630,8 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -658,8 +659,8 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -693,8 +694,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -730,8 +731,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -767,8 +768,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
- verify(wc.tal).registerRunningContainer(wc.containerID);
- verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -1011,7 +1012,7 @@ public class TestAMContainer {
wc.containerLaunched();
wc.assignTaskAttempt(wc.taskAttemptID);
ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
AMContainerTask task1 = argumentCaptor.getAllValues().get(0);
assertEquals(0, task1.getAdditionalResources().size());
wc.taskAttemptSucceeded(wc.taskAttemptID);
@@ -1024,7 +1025,7 @@ public class TestAMContainer {
TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taID2, additionalResources, new Credentials());
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
AMContainerTask task2 = argumentCaptor.getAllValues().get(1);
Map<String, LocalResource> pullTaskAdditionalResources = task2.getAdditionalResources();
assertEquals(2, pullTaskAdditionalResources.size());
@@ -1047,7 +1048,7 @@ public class TestAMContainer {
TezTaskAttemptID taID3 = TezTaskAttemptID.getInstance(wc.taskID, 3);
wc.assignTaskAttempt(taID3, new HashMap<String, LocalResource>(), new Credentials());
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
AMContainerTask task3 = argumentCaptor.getAllValues().get(2);
assertEquals(0, task3.getAdditionalResources().size());
wc.taskAttemptSucceeded(taID3);
@@ -1100,7 +1101,7 @@ public class TestAMContainer {
wc.containerLaunched();
wc.assignTaskAttempt(attempt11, LRs, dag1Credentials);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(0);
assertTrue(fetchedTask.haveCredentialsChanged());
assertNotNull(fetchedTask.getCredentials());
@@ -1109,7 +1110,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(attempt12, LRs, dag1Credentials);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(1);
assertFalse(fetchedTask.haveCredentialsChanged());
assertNull(fetchedTask.getCredentials());
@@ -1119,7 +1120,7 @@ public class TestAMContainer {
wc.setNewDAGID(dagID2);
wc.assignTaskAttempt(attempt21, LRs, null);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(2);
assertTrue(fetchedTask.haveCredentialsChanged());
assertNull(fetchedTask.getCredentials());
@@ -1127,7 +1128,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(attempt22, LRs, null);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(3);
assertFalse(fetchedTask.haveCredentialsChanged());
assertNull(fetchedTask.getCredentials());
@@ -1137,7 +1138,7 @@ public class TestAMContainer {
wc.setNewDAGID(dagID3);
wc.assignTaskAttempt(attempt31, LRs , dag3Credentials);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(4);
assertTrue(fetchedTask.haveCredentialsChanged());
assertNotNull(fetchedTask.getCredentials());
@@ -1147,7 +1148,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(attempt32, LRs, dag1Credentials);
argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
- verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+ verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
fetchedTask = argumentCaptor.getAllValues().get(5);
assertFalse(fetchedTask.haveCredentialsChanged());
assertNull(fetchedTask.getCredentials());
@@ -1200,9 +1201,10 @@ public class TestAMContainer {
chh = mock(ContainerHeartbeatHandler.class);
- InetSocketAddress addr = new InetSocketAddress("localhost", 0);
tal = mock(TaskAttemptListener.class);
- doReturn(addr).when(tal).getAddress();
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+ doReturn(taskComm).when(tal).getTaskCommunicator(0);
dagID = TezDAGID.getInstance(applicationID, 1);
vertexID = TezVertexID.getInstance(dagID, 1);
@@ -1228,7 +1230,7 @@ public class TestAMContainer {
doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
amContainer = new AMContainerImpl(container, chh, tal,
- new ContainerContextMatcher(), appContext);
+ new ContainerContextMatcher(), appContext, 0, 0, 0);
}
public WrappedContainer() {
@@ -1278,7 +1280,7 @@ public class TestAMContainer {
Token<JobTokenIdentifier> jobToken = mock(Token.class);
TokenCache.setSessionToken(jobToken, credentials);
amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
- new ContainerContext(localResources, credentials, new HashMap<String, String>(), "")));
+ new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0, 0));
}
public void assignTaskAttempt(TezTaskAttemptID taID) {
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index 61371e8..dee4541 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
@@ -43,8 +44,9 @@ public class TestAMContainerMap {
private TaskAttemptListener mockTaskAttemptListener() {
TaskAttemptListener tal = mock(TaskAttemptListener.class);
- InetSocketAddress socketAddr = new InetSocketAddress("localhost", 21000);
- doReturn(socketAddr).when(tal).getAddress();
+ TaskCommunicator taskComm = mock(TaskCommunicator.class);
+ doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
+ doReturn(taskComm).when(tal).getTaskCommunicator(0);
return tal;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index d7fc5ac..52643c5 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -19,6 +19,7 @@
package org.apache.tez.examples;
import java.io.IOException;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,7 +131,7 @@ public class JoinValidate extends TezExampleBase {
private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
throws IOException {
- DAG dag = DAG.create("JoinValidate");
+ DAG dag = DAG.create(getDagName());
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
@@ -147,15 +148,18 @@ public class JoinValidate extends TezExampleBase {
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+ setVertexProperties(lhsVertex, getLhsVertexProperties());
Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
ForwardingProcessor.class.getName())).addDataSource("rhs",
MRInput
.createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+ setVertexProperties(rhsVertex, getRhsVertexProperties());
Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
JoinValidateProcessor.class.getName()), numPartitions);
+ setVertexProperties(joinValidateVertex, getValidateVertexProperties());
Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
@@ -165,6 +169,30 @@ public class JoinValidate extends TezExampleBase {
return dag;
}
+ private void setVertexProperties(Vertex vertex, Map<String, String> properties) {
+ if (properties != null) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ vertex.setConf(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ protected Map<String, String> getLhsVertexProperties() {
+ return null;
+ }
+
+ protected Map<String, String> getRhsVertexProperties() {
+ return null;
+ }
+
+ protected Map<String, String> getValidateVertexProperties() {
+ return null;
+ }
+
+ protected String getDagName() {
+ return "JoinValidate";
+ }
+
public static class JoinValidateProcessor extends SimpleProcessor {
private static final Logger LOG = LoggerFactory.getLogger(JoinValidateProcessor.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index e83165b..27356bc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -14,6 +14,8 @@
package org.apache.tez.dag.app.launcher;
+import java.net.InetSocketAddress;
+
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
@@ -124,7 +126,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
- builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+ InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress();
+ builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
builder.setApplicationIdString(
event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index e3c18bf..5657f86 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -14,7 +14,6 @@
package org.apache.tez.dag.app.rm;
-import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -32,25 +31,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.service.TezTestServiceConfConstants;
-// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
-
public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
@@ -71,7 +62,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
private final ConcurrentMap<Object, ContainerId> runningTasks =
new ConcurrentHashMap<Object, ContainerId>();
- private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+ // AppIdIdentifier to avoid conflicts with other containres in the system.
// Per instance
private final int memoryPerInstance;
@@ -82,10 +73,13 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
private final Resource resourcePerContainer;
+ // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode),
+ // and take care of YARN registration.
public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
AppContext appContext,
String clientHostname, int clientPort,
String trackingUrl,
+ long customAppIdIdentifier,
Configuration conf) {
// Accepting configuration here to allow setting up fields as final
super(TezTestServiceTaskSchedulerService.class.getName());
@@ -93,7 +87,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
this.appClientDelegate = createAppCallbackDelegate(appClient);
this.appContext = appContext;
this.serviceHosts = new LinkedList<String>();
- this.containerFactory = new ContainerFactory(appContext);
+ this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
this.memoryPerInstance = conf
.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
@@ -123,7 +117,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
- this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
if (hosts == null || hosts.length == 0) {
@@ -143,36 +136,8 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
@Override
- public void serviceInit(Configuration conf) {
- amRmClient.init(conf);
- }
-
- @Override
- public void serviceStart() {
- amRmClient.start();
- RegisterApplicationMasterResponse response;
- try {
- amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
- } catch (YarnException e) {
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
- }
-
- @Override
public void serviceStop() {
if (!this.isStopped.getAndSet(true)) {
-
- try {
- TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
- amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
- status.postCompletionTrackingUrl);
- } catch (YarnException e) {
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- throw new TezUncheckedException(e);
- }
appCallbackExecutor.shutdownNow();
}
}
@@ -264,7 +229,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
private ExecutorService createAppCallbackExecutorService() {
return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+ .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build());
}
private TaskSchedulerAppCallback createAppCallbackDelegate(
@@ -274,7 +239,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
private String selectHost(String[] requestedHosts) {
- String host = null;
+ String host;
if (requestedHosts != null && requestedHosts.length > 0) {
Arrays.sort(requestedHosts);
host = requestedHosts[0];
@@ -287,17 +252,19 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
}
static class ContainerFactory {
- final AppContext appContext;
AtomicInteger nextId;
-
- public ContainerFactory(AppContext appContext) {
- this.appContext = appContext;
- this.nextId = new AtomicInteger(2);
+ final ApplicationAttemptId customAppAttemptId;
+
+ public ContainerFactory(AppContext appContext, long appIdLong) {
+ this.nextId = new AtomicInteger(1);
+ ApplicationId appId = ApplicationId
+ .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+ this.customAppAttemptId = ApplicationAttemptId
+ .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
}
public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
- ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
- ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+ ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, port);
String nodeHttpAddress = "hostname:0";
@@ -311,37 +278,4 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
return container;
}
}
-
- private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
-
- @Override
- public void onContainersCompleted(List<ContainerStatus> statuses) {
-
- }
-
- @Override
- public void onContainersAllocated(List<Container> containers) {
-
- }
-
- @Override
- public void onShutdownRequest() {
-
- }
-
- @Override
- public void onNodesUpdated(List<NodeReport> updatedNodes) {
-
- }
-
- @Override
- public float getProgress() {
- return 0;
- }
-
- @Override
- public void onError(Throwable e) {
-
- }
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
new file mode 100644
index 0000000..e5d2e3b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.util.Map;
+
+public class JoinValidateConfigured extends JoinValidate {
+
+ private final Map<String, String> lhsProps;
+ private final Map<String, String> rhsProps;
+ private final Map<String, String> validateProps;
+ private final String dagNameSuffix;
+
+ public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps,
+ Map<String, String> validateProps, String dagNameSuffix) {
+ this.lhsProps = lhsProps;
+ this.rhsProps = rhsProps;
+ this.validateProps = validateProps;
+ this.dagNameSuffix = dagNameSuffix;
+ }
+
+ @Override
+ protected Map<String, String> getLhsVertexProperties() {
+ return this.lhsProps;
+ }
+
+ @Override
+ protected Map<String, String> getRhsVertexProperties() {
+ return this.rhsProps;
+ }
+
+ @Override
+ protected Map<String, String> getValidateVertexProperties() {
+ return this.validateProps;
+ }
+
+ @Override
+ protected String getDagName() {
+ return "JoinValidate_" + dagNameSuffix;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/c47951ab/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index ae7e7f8..9c149c6 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -15,11 +15,11 @@
package org.apache.tez.tests;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Map;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,13 +28,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
-import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.service.MiniTezTestServiceCluster;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
@@ -47,23 +48,31 @@ public class TestExternalTezServices {
private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
- private static MiniTezCluster tezCluster;
- private static MiniDFSCluster dfsCluster;
- private static MiniTezTestServiceCluster tezTestServiceCluster;
+ private static volatile MiniTezCluster tezCluster;
+ private static volatile MiniDFSCluster dfsCluster;
+ private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
- private static Configuration clusterConf = new Configuration();
- private static Configuration confForJobs;
+ private static volatile Configuration clusterConf = new Configuration();
+ private static volatile Configuration confForJobs;
- private static FileSystem remoteFs;
- private static FileSystem localFs;
+ private static volatile FileSystem remoteFs;
+ private static volatile FileSystem localFs;
- private static TezClient sharedTezClient;
+ private static volatile TezClient sharedTezClient;
+
+ private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName());
+ private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
+ private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
+
+ private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap();
+ private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap();
+ private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap();
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
+ "-tmpDir";
@BeforeClass
- public static void setup() throws IOException, TezException, InterruptedException {
+ public static void setup() throws Exception {
localFs = FileSystem.getLocal(clusterConf);
@@ -108,27 +117,79 @@ public class TestExternalTezServices {
remoteFs.mkdirs(stagingDirPath);
// This is currently configured to push tasks into the Service, and then use the standard RPC
confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
- confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+
+ confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
- confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
- EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
- confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
- EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
- confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+ confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+ EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
+ confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+// TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+ EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
- TezConfiguration tezConf = new TezConfiguration(confForJobs);
+ // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+
+ // Setup various executor sets
+ PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+ PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+
+ PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+ PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+ PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+
+ PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+ PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+ PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+ TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+
+
+ // Create a session to use for all tests.
+ TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
- tezConf, true);
+ tezClientConf, true);
sharedTezClient.start();
LOG.info("Shared TezSession started");
sharedTezClient.waitTillReady();
LOG.info("Shared TezSession ready for submission");
+ // Generate the join data set used for each run.
+ // Can a timeout be enforced here ?
+ remoteFs.mkdirs(SRC_DATA_DIR);
+ Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
+ Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
+ TezConfiguration tezConf = new TezConfiguration(confForJobs);
+ // Generate join data - with 2 tasks.
+ JoinDataGen dataGen = new JoinDataGen();
+ String[] dataGenArgs = new String[]{
+ dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+ HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"};
+ assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+ // Run the actual join - with 2 reducers
+ HashJoinExample joinExample = new HashJoinExample();
+ String[] args = new String[]{
+ dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()};
+ assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+
+ LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
}
@AfterClass
@@ -156,35 +217,50 @@ public class TestExternalTezServices {
@Test(timeout = 60000)
- public void test1() throws Exception {
- Path testDir = new Path("/tmp/testHashJoinExample");
+ public void testAllInService() throws Exception {
+ int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers.
+ runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+ PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH);
+ }
- remoteFs.mkdirs(testDir);
+ @Test(timeout = 60000)
+ public void testAllInContainers() throws Exception {
+ int expectedExternalSubmissions = 0; // All in containers
+ runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+ PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS);
+ }
- Path dataPath1 = new Path(testDir, "inPath1");
- Path dataPath2 = new Path(testDir, "inPath2");
- Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
- Path outPath = new Path(testDir, "outPath");
+ @Test(timeout = 60000)
+ public void testMixed1() throws Exception { // M-ExtService, R-containers
+ int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers.
+ runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+ PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
+ }
- TezConfiguration tezConf = new TezConfiguration(confForJobs);
+ @Test(timeout = 60000)
+ public void testMixed2() throws Exception { // M-Containers, R-ExtService
+ int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers.
+ runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+ PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
+ }
- JoinDataGen dataGen = new JoinDataGen();
- String[] dataGenArgs = new String[]{
- dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
- expectedOutputPath.toString(), "2"};
- assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
- HashJoinExample joinExample = new HashJoinExample();
- String[] args = new String[]{
- dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
- assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+ private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
+ Map<String, String> rhsProps,
+ Map<String, String> validateProps) throws
+ Exception {
+ int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
- JoinValidate joinValidate = new JoinValidate();
- String[] validateArgs = new String[]{
- expectedOutputPath.toString(), outPath.toString(), "3"};
+ TezConfiguration tezConf = new TezConfiguration(confForJobs);
+ JoinValidateConfigured joinValidate =
+ new JoinValidateConfigured(lhsProps, rhsProps,
+ validateProps, name);
+ String[] validateArgs = new String[]{"-disableSplitGrouping",
+ HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
// Ensure this was actually submitted to the external cluster
- assertTrue(tezTestServiceCluster.getNumSubmissions() > 0);
+ assertEquals(extExpectedCount,
+ (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount));
}
}