You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 09:41:38 UTC

[36/50] [abbrv] tez git commit: TEZ-2123. Fix component managers to use pluggable components. Enable hybrid mode. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 54b9adb..c1169ef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -223,7 +223,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(
-        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED));
+        ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(1)).taskAllocated(
@@ -235,7 +235,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
 
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
@@ -356,7 +356,7 @@ public class TestContainerReuse {
 
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -459,7 +459,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta12), any(Object.class), eq(container1));
@@ -469,7 +469,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta12), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta13), any(Object.class), eq(container1));
@@ -478,7 +478,7 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     // Verify no re-use if a previous task fails.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, 0));
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class), eq(container1));
     verify(taskScheduler).deallocateTask(eq(ta13), eq(false));
@@ -496,7 +496,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta14), any(Object.class), eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta14), eq(true));
     verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
@@ -607,7 +607,7 @@ public class TestContainerReuse {
 
     // First task had profiling on. This container can not be reused further.
     taskSchedulerEventHandler.handleEvent(
-        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
+        new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta12), any(Object.class),
@@ -653,7 +653,7 @@ public class TestContainerReuse {
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta13), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(ta14), any(Object.class),
@@ -698,7 +698,7 @@ public class TestContainerReuse {
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta15), any(Object.class), eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta15), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta16), any(Object.class), eq(container3));
@@ -811,7 +811,7 @@ public class TestContainerReuse {
     // until delay expires.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler, times(0)).taskAllocated(
@@ -828,7 +828,7 @@ public class TestContainerReuse {
     // TA12 completed.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
@@ -946,7 +946,7 @@ public class TestContainerReuse {
     // Container should  be assigned to task21.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta11, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta11), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(
@@ -956,7 +956,7 @@ public class TestContainerReuse {
     // Task 2 completes.
     taskSchedulerEventHandler.handleEvent(
       new AMSchedulerEventTAEnded(ta21, container1.getId(),
-        TaskAttemptState.SUCCEEDED));
+        TaskAttemptState.SUCCEEDED, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
@@ -1065,7 +1065,7 @@ public class TestContainerReuse {
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta111), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta112), any(Object.class), eq(container1));
@@ -1077,7 +1077,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta112), eq(true));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1118,7 +1118,7 @@ public class TestContainerReuse {
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED));
+    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, 0));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta211), eq(true));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta212), any(Object.class), eq(container1));

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 60782e6..12390b2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -59,7 +59,7 @@ public class TestLocalTaskScheduler {
     TezConfiguration tezConf = new TezConfiguration();
     tezConf.setInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS, MAX_TASKS);
 
-    LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext());
+    LocalContainerFactory containerFactory = new LocalContainerFactory(createMockAppContext(), 1000);
     HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>();
     PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
     TaskSchedulerAppCallback appClientDelegate = mock(TaskSchedulerAppCallback.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 3cf4f6c..25cf4b5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -81,8 +83,12 @@ public class TestLocalTaskSchedulerService {
    */
   @Test(timeout = 5000)
   public void testDeallocationBeforeAllocation() {
+    AppContext appContext = mock(AppContext.class);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
-        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
     taskSchedulerService.init(new Configuration());
     taskSchedulerService.start();
 
@@ -105,8 +111,12 @@ public class TestLocalTaskSchedulerService {
    */
   @Test(timeout = 5000)
   public void testDeallocationAfterAllocation() {
+    AppContext appContext = mock(AppContext.class);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(10000l, 1), 1);
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
-        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", appContext);
     taskSchedulerService.init(new Configuration());
     taskSchedulerService.start();
 
@@ -132,13 +142,13 @@ public class TestLocalTaskSchedulerService {
         String appHostName, int appHostPort, String appTrackingUrl,
         AppContext appContext) {
       super(appClient, containerSignatureMatcher, appHostName, appHostPort,
-          appTrackingUrl, appContext);
+          appTrackingUrl, 10000l, appContext);
     }
 
     @Override
     public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
       requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
-          new LocalContainerFactory(appContext),
+          new LocalContainerFactory(appContext, customContainerAppId),
           taskAllocations,
           appClientDelegate,
           conf);

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 291e786..4ee05cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -89,7 +89,7 @@ public class TestTaskSchedulerEventHandler {
     public MockTaskSchedulerEventHandler(AppContext appContext,
         DAGClientServer clientService, EventHandler eventHandler,
         ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {});
+      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI, new String[] {}, false);
     }
 
     @Override
@@ -162,7 +162,7 @@ public class TestTaskSchedulerEventHandler {
 
     AMSchedulerEventTALaunchRequest lr =
         new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
-            priority, containerContext);
+            priority, containerContext, 0, 0, 0);
     schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
     assertEquals(2, mockEventHandler.events.size());
     assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
@@ -249,9 +249,14 @@ public class TestTaskSchedulerEventHandler {
     Configuration conf = new Configuration(false);
     schedulerHandler.init(conf);
     schedulerHandler.start();
-    
+
+    AMContainer mockAmContainer = mock(AMContainer.class);
+    when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0);
+    when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0);
+    when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0);
     ContainerId mockCId = mock(ContainerId.class);
     verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
+    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
     schedulerHandler.preemptContainer(mockCId);
     verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
     assertEquals(1, mockEventHandler.events.size());

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index d775300..ffab769 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -127,7 +127,7 @@ class TestTaskSchedulerHelpers {
         EventHandler eventHandler,
         TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
         ContainerSignatureMatcher containerSignatureMatcher) {
-      super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{});
+      super(appContext, null, eventHandler, containerSignatureMatcher, null, new String[]{}, false);
       this.amrmClientAsync = amrmClientAsync;
       this.containerSignatureMatcher = containerSignatureMatcher;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fafbba6..bdd0f61 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
@@ -104,7 +105,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.LAUNCHING);
     // 1 Launch request.
     wc.verifyCountAndGetOutgoingEvents(1);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
     assertNull(wc.amContainer.getCurrentTaskAttempt());
 
     // Assign task.
@@ -121,7 +122,7 @@ public class TestAMContainer {
     // Once for the previous NO_TASKS, one for the actual task.
     verify(wc.chh).register(wc.containerID);
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
     assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
     assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority());
@@ -131,14 +132,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -157,7 +158,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.LAUNCHING);
     // 1 Launch request.
     wc.verifyCountAndGetOutgoingEvents(1);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
 
     // Container Launched
     wc.containerLaunched();
@@ -172,7 +173,7 @@ public class TestAMContainer {
     wc.verifyNoOutgoingEvents();
     assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
     assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
 
@@ -180,13 +181,13 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
 
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -205,7 +206,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.LAUNCHING);
     // 1 Launch request.
     wc.verifyCountAndGetOutgoingEvents(1);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
     assertNull(wc.amContainer.getCurrentTaskAttempt());
 
     // Assign task.
@@ -222,7 +223,7 @@ public class TestAMContainer {
     // Once for the previous NO_TASKS, one for the actual task.
     verify(wc.chh).register(wc.containerID);
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     assertEquals(1, argumentCaptor.getAllValues().size());
     assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
 
@@ -231,13 +232,13 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
+    verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
 
     TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taId2);
     wc.verifyState(AMContainerState.RUNNING);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     assertEquals(2, argumentCaptor.getAllValues().size());
     assertEquals(taId2, argumentCaptor.getAllValues().get(1).getTask().getTaskAttemptID());
 
@@ -246,14 +247,14 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.IDLE);
     wc.verifyNoOutgoingEvents();
     assertNull(wc.amContainer.getCurrentTaskAttempt());
-    verify(wc.tal).unregisterTaskAttempt(taId2);
+    verify(wc.tal).unregisterTaskAttempt(taId2, 0);
 
     // Container completed
     wc.containerCompleted();
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
 
     assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -286,7 +287,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -323,7 +324,7 @@ public class TestAMContainer {
     wc.verifyHistoryStopEvent();
     wc.verifyState(AMContainerState.COMPLETED);
     wc.verifyNoOutgoingEvents();
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
 
     assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -346,7 +347,7 @@ public class TestAMContainer {
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -378,13 +379,13 @@ public class TestAMContainer {
     wc.launchContainer();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.LAUNCHING);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
 
     TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2);
 
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
     // 1 for NM stop request. 2 TERMINATING to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -420,7 +421,7 @@ public class TestAMContainer {
 
     wc.containerTimedOut();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -454,7 +455,7 @@ public class TestAMContainer {
 
     wc.stopRequest();
     wc.verifyState(AMContainerState.STOP_REQUESTED);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).unregister(wc.containerID);
     // 1 to TA, 1 for RM de-allocate.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -484,11 +485,11 @@ public class TestAMContainer {
     wc.launchContainer();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.LAUNCHING);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
     wc.launchFailed();
     wc.verifyState(AMContainerState.STOPPING);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -537,8 +538,8 @@ public class TestAMContainer {
 
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -567,8 +568,8 @@ public class TestAMContainer {
 
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -598,8 +599,8 @@ public class TestAMContainer {
 
     wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -629,8 +630,8 @@ public class TestAMContainer {
 
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -658,8 +659,8 @@ public class TestAMContainer {
 
     wc.containerCompleted();
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -693,8 +694,8 @@ public class TestAMContainer {
 
     wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -730,8 +731,8 @@ public class TestAMContainer {
 
     wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -767,8 +768,8 @@ public class TestAMContainer {
 
     wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
     wc.verifyState(AMContainerState.COMPLETED);
-    verify(wc.tal).registerRunningContainer(wc.containerID);
-    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.tal).registerRunningContainer(wc.containerID, 0);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
     verify(wc.chh).register(wc.containerID);
     verify(wc.chh).unregister(wc.containerID);
 
@@ -1011,7 +1012,7 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.assignTaskAttempt(wc.taskAttemptID);
     ArgumentCaptor<AMContainerTask> argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     AMContainerTask task1 = argumentCaptor.getAllValues().get(0);
     assertEquals(0, task1.getAdditionalResources().size());
     wc.taskAttemptSucceeded(wc.taskAttemptID);
@@ -1024,7 +1025,7 @@ public class TestAMContainer {
     TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
     wc.assignTaskAttempt(taID2, additionalResources, new Credentials());
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     AMContainerTask task2 = argumentCaptor.getAllValues().get(1);
     Map<String, LocalResource> pullTaskAdditionalResources = task2.getAdditionalResources();
     assertEquals(2, pullTaskAdditionalResources.size());
@@ -1047,7 +1048,7 @@ public class TestAMContainer {
     TezTaskAttemptID taID3 = TezTaskAttemptID.getInstance(wc.taskID, 3);
     wc.assignTaskAttempt(taID3, new HashMap<String, LocalResource>(), new Credentials());
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     AMContainerTask task3 = argumentCaptor.getAllValues().get(2);
     assertEquals(0, task3.getAdditionalResources().size());
     wc.taskAttemptSucceeded(taID3);
@@ -1100,7 +1101,7 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.assignTaskAttempt(attempt11, LRs, dag1Credentials);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(0);
     assertTrue(fetchedTask.haveCredentialsChanged());
     assertNotNull(fetchedTask.getCredentials());
@@ -1109,7 +1110,7 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(attempt12, LRs, dag1Credentials);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(2)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(1);
     assertFalse(fetchedTask.haveCredentialsChanged());
     assertNull(fetchedTask.getCredentials());
@@ -1119,7 +1120,7 @@ public class TestAMContainer {
     wc.setNewDAGID(dagID2);
     wc.assignTaskAttempt(attempt21, LRs, null);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(3)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(2);
     assertTrue(fetchedTask.haveCredentialsChanged());
     assertNull(fetchedTask.getCredentials());
@@ -1127,7 +1128,7 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(attempt22, LRs, null);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(4)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(3);
     assertFalse(fetchedTask.haveCredentialsChanged());
     assertNull(fetchedTask.getCredentials());
@@ -1137,7 +1138,7 @@ public class TestAMContainer {
     wc.setNewDAGID(dagID3);
     wc.assignTaskAttempt(attempt31, LRs , dag3Credentials);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(5)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(4);
     assertTrue(fetchedTask.haveCredentialsChanged());
     assertNotNull(fetchedTask.getCredentials());
@@ -1147,7 +1148,7 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(attempt32, LRs, dag1Credentials);
     argumentCaptor = ArgumentCaptor.forClass(AMContainerTask.class);
-    verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
+    verify(wc.tal, times(6)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID), eq(0));
     fetchedTask = argumentCaptor.getAllValues().get(5);
     assertFalse(fetchedTask.haveCredentialsChanged());
     assertNull(fetchedTask.getCredentials());
@@ -1200,9 +1201,10 @@ public class TestAMContainer {
 
       chh = mock(ContainerHeartbeatHandler.class);
 
-      InetSocketAddress addr = new InetSocketAddress("localhost", 0);
       tal = mock(TaskAttemptListener.class);
-      doReturn(addr).when(tal).getAddress();
+      TaskCommunicator taskComm = mock(TaskCommunicator.class);
+      doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
+      doReturn(taskComm).when(tal).getTaskCommunicator(0);
 
       dagID = TezDAGID.getInstance(applicationID, 1);
       vertexID = TezVertexID.getInstance(dagID, 1);
@@ -1228,7 +1230,7 @@ public class TestAMContainer {
       doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
 
       amContainer = new AMContainerImpl(container, chh, tal,
-          new ContainerContextMatcher(), appContext);
+          new ContainerContextMatcher(), appContext, 0, 0, 0);
     }
 
     public WrappedContainer() {
@@ -1278,7 +1280,7 @@ public class TestAMContainer {
       Token<JobTokenIdentifier> jobToken = mock(Token.class);
       TokenCache.setSessionToken(jobToken, credentials);
       amContainer.handle(new AMContainerEventLaunchRequest(containerID, vertexID,
-          new ContainerContext(localResources, credentials, new HashMap<String, String>(), "")));
+          new ContainerContext(localResources, credentials, new HashMap<String, String>(), ""), 0, 0));
     }
 
     public void assignTaskAttempt(TezTaskAttemptID taID) {

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index 61371e8..dee4541 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -43,8 +44,9 @@ public class TestAMContainerMap {
 
   private TaskAttemptListener mockTaskAttemptListener() {
     TaskAttemptListener tal = mock(TaskAttemptListener.class);
-    InetSocketAddress socketAddr = new InetSocketAddress("localhost", 21000);
-    doReturn(socketAddr).when(tal).getAddress();
+    TaskCommunicator taskComm = mock(TaskCommunicator.class);
+    doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
+    doReturn(taskComm).when(tal).getTaskCommunicator(0);
     return tal;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
----------------------------------------------------------------------
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
index d7fc5ac..52643c5 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java
@@ -19,6 +19,7 @@
 package org.apache.tez.examples;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,7 +131,7 @@ public class JoinValidate extends TezExampleBase {
 
   private DAG createDag(TezConfiguration tezConf, Path lhs, Path rhs, int numPartitions)
       throws IOException {
-    DAG dag = DAG.create("JoinValidate");
+    DAG dag = DAG.create(getDagName());
 
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. Fix after there's a
@@ -147,15 +148,18 @@ public class JoinValidate extends TezExampleBase {
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+    setVertexProperties(lhsVertex, getLhsVertexProperties());
 
     Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create(
         ForwardingProcessor.class.getName())).addDataSource("rhs",
         MRInput
             .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class,
                 rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build());
+    setVertexProperties(rhsVertex, getRhsVertexProperties());
 
     Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create(
         JoinValidateProcessor.class.getName()), numPartitions);
+    setVertexProperties(joinValidateVertex, getValidateVertexProperties());
 
     Edge e1 = Edge.create(lhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
     Edge e2 = Edge.create(rhsVertex, joinValidateVertex, edgeConf.createDefaultEdgeProperty());
@@ -165,6 +169,30 @@ public class JoinValidate extends TezExampleBase {
     return dag;
   }
 
+  private void setVertexProperties(Vertex vertex, Map<String, String> properties) {
+    if (properties != null) {
+      for (Map.Entry<String, String> entry : properties.entrySet()) {
+        vertex.setConf(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  protected Map<String, String> getLhsVertexProperties() {
+    return null;
+  }
+
+  protected Map<String, String> getRhsVertexProperties() {
+    return null;
+  }
+
+  protected Map<String, String> getValidateVertexProperties() {
+    return null;
+  }
+
+  protected String getDagName() {
+    return "JoinValidate";
+  }
+
   public static class JoinValidateProcessor extends SimpleProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(JoinValidateProcessor.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index e83165b..27356bc 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -14,6 +14,8 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import java.net.InetSocketAddress;
+
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
@@ -124,7 +126,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
 
   private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
     RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
-    builder.setAmHost(tal.getAddress().getHostName()).setAmPort(tal.getAddress().getPort());
+    InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress();
+    builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
     builder.setAppAttemptNumber(event.getContainer().getId().getApplicationAttemptId().getAttemptId());
     builder.setApplicationIdString(
         event.getContainer().getId().getApplicationAttemptId().getApplicationId().toString());

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index e3c18bf..5657f86 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -14,7 +14,6 @@
 
 package org.apache.tez.dag.app.rm;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
@@ -32,25 +31,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.service.TezTestServiceConfConstants;
 
 
-// TODO Registration with RM - so that the AM is considered dead and restarted in the expiry interval - 10 minutes.
-
 public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
 
   private static final Log LOG = LogFactory.getLog(TezTestServiceTaskSchedulerService.class);
@@ -71,7 +62,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   private final ConcurrentMap<Object, ContainerId> runningTasks =
       new ConcurrentHashMap<Object, ContainerId>();
 
-  private final AMRMClientAsync<AMRMClient.ContainerRequest> amRmClient;
+  // AppIdIdentifier to avoid conflicts with other containres in the system.
 
   // Per instance
   private final int memoryPerInstance;
@@ -82,10 +73,13 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   private final Resource resourcePerContainer;
 
 
+  // Not registering with the RM. Assuming the main TezScheduler will always run (except local mode),
+  // and take care of YARN registration.
   public TezTestServiceTaskSchedulerService(TaskSchedulerAppCallback appClient,
                                             AppContext appContext,
                                             String clientHostname, int clientPort,
                                             String trackingUrl,
+                                            long customAppIdIdentifier,
                                             Configuration conf) {
     // Accepting configuration here to allow setting up fields as final
     super(TezTestServiceTaskSchedulerService.class.getName());
@@ -93,7 +87,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     this.appClientDelegate = createAppCallbackDelegate(appClient);
     this.appContext = appContext;
     this.serviceHosts = new LinkedList<String>();
-    this.containerFactory = new ContainerFactory(appContext);
+    this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier);
 
     this.memoryPerInstance = conf
         .getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_MEMORY_PER_INSTANCE_MB, -1);
@@ -123,7 +117,6 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
     int memoryPerContainer = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerContainer = (int) (coresPerInstance / (float) executorsPerInstance);
     this.resourcePerContainer = Resource.newInstance(memoryPerContainer, coresPerContainer);
-    this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(5000, new FakeAmRmCallbackHandler());
 
     String[] hosts = conf.getTrimmedStrings(TezTestServiceConfConstants.TEZ_TEST_SERVICE_HOSTS);
     if (hosts == null || hosts.length == 0) {
@@ -143,36 +136,8 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
-    amRmClient.init(conf);
-  }
-
-  @Override
-  public void serviceStart() {
-    amRmClient.start();
-    RegisterApplicationMasterResponse response;
-    try {
-      amRmClient.registerApplicationMaster(clientHostname, clientPort, trackingUrl);
-    } catch (YarnException e) {
-      throw new TezUncheckedException(e);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-
-  @Override
   public void serviceStop() {
     if (!this.isStopped.getAndSet(true)) {
-
-      try {
-        TaskSchedulerAppCallback.AppFinalStatus status = appClientDelegate.getFinalAppStatus();
-        amRmClient.unregisterApplicationMaster(status.exitStatus, status.exitMessage,
-            status.postCompletionTrackingUrl);
-      } catch (YarnException e) {
-        throw new TezUncheckedException(e);
-      } catch (IOException e) {
-        throw new TezUncheckedException(e);
-      }
       appCallbackExecutor.shutdownNow();
     }
   }
@@ -264,7 +229,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
 
   private ExecutorService createAppCallbackExecutorService() {
     return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+        .setNameFormat("TezTestTaskSchedulerAppCaller").setDaemon(true).build());
   }
 
   private TaskSchedulerAppCallback createAppCallbackDelegate(
@@ -274,7 +239,7 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   private String selectHost(String[] requestedHosts) {
-    String host = null;
+    String host;
     if (requestedHosts != null && requestedHosts.length > 0) {
       Arrays.sort(requestedHosts);
       host = requestedHosts[0];
@@ -287,17 +252,19 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
   }
 
   static class ContainerFactory {
-    final AppContext appContext;
     AtomicInteger nextId;
-
-    public ContainerFactory(AppContext appContext) {
-      this.appContext = appContext;
-      this.nextId = new AtomicInteger(2);
+    final ApplicationAttemptId customAppAttemptId;
+
+    public ContainerFactory(AppContext appContext, long appIdLong) {
+      this.nextId = new AtomicInteger(1);
+      ApplicationId appId = ApplicationId
+          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+      this.customAppAttemptId = ApplicationAttemptId
+          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority, String hostname, int port) {
-      ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
-      ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+      ContainerId containerId = ContainerId.newInstance(customAppAttemptId, nextId.getAndIncrement());
       NodeId nodeId = NodeId.newInstance(hostname, port);
       String nodeHttpAddress = "hostname:0";
 
@@ -311,37 +278,4 @@ public class TezTestServiceTaskSchedulerService extends TaskSchedulerService {
       return container;
     }
   }
-
-  private static class FakeAmRmCallbackHandler implements AMRMClientAsync.CallbackHandler {
-
-    @Override
-    public void onContainersCompleted(List<ContainerStatus> statuses) {
-
-    }
-
-    @Override
-    public void onContainersAllocated(List<Container> containers) {
-
-    }
-
-    @Override
-    public void onShutdownRequest() {
-
-    }
-
-    @Override
-    public void onNodesUpdated(List<NodeReport> updatedNodes) {
-
-    }
-
-    @Override
-    public float getProgress() {
-      return 0;
-    }
-
-    @Override
-    public void onError(Throwable e) {
-
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
new file mode 100644
index 0000000..e5d2e3b
--- /dev/null
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/examples/JoinValidateConfigured.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.examples;
+
+import java.util.Map;
+
+public class JoinValidateConfigured extends JoinValidate {
+
+  private final Map<String, String> lhsProps;
+  private final Map<String, String> rhsProps;
+  private final Map<String, String> validateProps;
+  private final String dagNameSuffix;
+
+  public JoinValidateConfigured(Map<String, String> lhsProps, Map<String, String> rhsProps,
+                                Map<String, String> validateProps, String dagNameSuffix) {
+    this.lhsProps = lhsProps;
+    this.rhsProps = rhsProps;
+    this.validateProps = validateProps;
+    this.dagNameSuffix = dagNameSuffix;
+  }
+
+  @Override
+  protected Map<String, String> getLhsVertexProperties() {
+    return this.lhsProps;
+  }
+
+  @Override
+  protected Map<String, String> getRhsVertexProperties() {
+    return this.rhsProps;
+  }
+
+  @Override
+  protected Map<String, String> getValidateVertexProperties() {
+    return this.validateProps;
+  }
+
+  @Override
+  protected String getDagName() {
+    return "JoinValidate_" + dagNameSuffix;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/25980c1a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
index ae7e7f8..9c149c6 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/tests/TestExternalTezServices.java
@@ -15,11 +15,11 @@
 package org.apache.tez.tests;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,13 +28,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
 import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
 import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
 import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.JoinDataGen;
-import org.apache.tez.examples.JoinValidate;
+import org.apache.tez.examples.JoinValidateConfigured;
 import org.apache.tez.service.MiniTezTestServiceCluster;
 import org.apache.tez.test.MiniTezCluster;
 import org.junit.AfterClass;
@@ -47,23 +48,31 @@ public class TestExternalTezServices {
 
   private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
 
-  private static MiniTezCluster tezCluster;
-  private static MiniDFSCluster dfsCluster;
-  private static MiniTezTestServiceCluster tezTestServiceCluster;
+  private static volatile MiniTezCluster tezCluster;
+  private static volatile MiniDFSCluster dfsCluster;
+  private static volatile MiniTezTestServiceCluster tezTestServiceCluster;
 
-  private static Configuration clusterConf = new Configuration();
-  private static Configuration confForJobs;
+  private static volatile Configuration clusterConf = new Configuration();
+  private static volatile Configuration confForJobs;
 
-  private static FileSystem remoteFs;
-  private static FileSystem localFs;
+  private static volatile FileSystem remoteFs;
+  private static volatile FileSystem localFs;
 
-  private static TezClient sharedTezClient;
+  private static volatile TezClient sharedTezClient;
+
+  private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName());
+  private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
+  private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
+
+  private static final Map<String, String> PROPS_EXT_SERVICE_PUSH = Maps.newHashMap();
+  private static final Map<String, String> PROPS_REGULAR_CONTAINERS = Maps.newHashMap();
+  private static final Map<String, String> PROPS_IN_AM = Maps.newHashMap();
 
   private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServices.class.getName()
       + "-tmpDir";
 
   @BeforeClass
-  public static void setup() throws IOException, TezException, InterruptedException {
+  public static void setup() throws Exception {
 
     localFs = FileSystem.getLocal(clusterConf);
 
@@ -108,27 +117,79 @@ public class TestExternalTezServices {
     remoteFs.mkdirs(stagingDirPath);
     // This is currently configured to push tasks into the Service, and then use the standard RPC
     confForJobs.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
-    confForJobs.set(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+
+    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_SCHEDULERS,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
         EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskSchedulerService.class.getName());
-    confForJobs.set(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
-        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
-    confForJobs.set(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
-        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
 
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
-    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+    confForJobs.setStrings(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHERS,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceNoOpContainerLauncher.class.getName());
 
+    confForJobs.setStrings(TezConfiguration.TEZ_AM_TASK_COMMUNICATORS,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT,
+//        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT,
+        EXT_PUSH_ENTITY_NAME + ":" + TezTestServiceTaskCommunicatorImpl.class.getName());
 
-    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    // Default all jobs to run via the service. Individual tests override this on a per vertex/dag level.
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+    confForJobs.set(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+
+    // Setup various executor sets
+    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+    PROPS_REGULAR_CONTAINERS.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT);
+
+    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME, EXT_PUSH_ENTITY_NAME);
+    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME, EXT_PUSH_ENTITY_NAME);
+    PROPS_EXT_SERVICE_PUSH.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME, EXT_PUSH_ENTITY_NAME);
+
+    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_SCHEDULER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_CONTAINER_LAUNCHER_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+    PROPS_IN_AM.put(TezConfiguration.TEZ_AM_VERTEX_TASK_COMMUNICATOR_NAME,
+        TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT);
+
+
+    // Create a session to use for all tests.
+    TezConfiguration tezClientConf = new TezConfiguration(confForJobs);
 
     sharedTezClient = TezClient.create(TestExternalTezServices.class.getSimpleName() + "_session",
-        tezConf, true);
+        tezClientConf, true);
     sharedTezClient.start();
     LOG.info("Shared TezSession started");
     sharedTezClient.waitTillReady();
     LOG.info("Shared TezSession ready for submission");
 
+    // Generate the join data set used for each run.
+    // Can a timeout be enforced here ?
+    remoteFs.mkdirs(SRC_DATA_DIR);
+    Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
+    Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    //   Generate join data - with 2 tasks.
+    JoinDataGen dataGen = new JoinDataGen();
+    String[] dataGenArgs = new String[]{
+        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
+        HASH_JOIN_EXPECTED_RESULT_PATH.toString(), "2"};
+    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
+    //    Run the actual join - with 2 reducers
+    HashJoinExample joinExample = new HashJoinExample();
+    String[] args = new String[]{
+        dataPath1.toString(), dataPath2.toString(), "2", HASH_JOIN_OUTPUT_PATH.toString()};
+    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+
+    LOG.info("Completed generating Data - Expected Hash Result and Actual Join Result");
   }
 
   @AfterClass
@@ -156,35 +217,50 @@ public class TestExternalTezServices {
 
 
   @Test(timeout = 60000)
-  public void test1() throws Exception {
-    Path testDir = new Path("/tmp/testHashJoinExample");
+  public void testAllInService() throws Exception {
+    int expectedExternalSubmissions = 4 + 3; //4 for 4 src files, 3 for num reducers.
+    runJoinValidate("AllInService", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+        PROPS_EXT_SERVICE_PUSH, PROPS_EXT_SERVICE_PUSH);
+  }
 
-    remoteFs.mkdirs(testDir);
+  @Test(timeout = 60000)
+  public void testAllInContainers() throws Exception {
+    int expectedExternalSubmissions = 0; // All in containers
+    runJoinValidate("AllInContainers", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+        PROPS_REGULAR_CONTAINERS, PROPS_REGULAR_CONTAINERS);
+  }
 
-    Path dataPath1 = new Path(testDir, "inPath1");
-    Path dataPath2 = new Path(testDir, "inPath2");
-    Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
-    Path outPath = new Path(testDir, "outPath");
+  @Test(timeout = 60000)
+  public void testMixed1() throws Exception { // M-ExtService, R-containers
+    int expectedExternalSubmissions = 4 + 0; //4 for 4 src files, 3 for num reducers.
+    runJoinValidate("Mixed1", expectedExternalSubmissions, PROPS_EXT_SERVICE_PUSH,
+        PROPS_EXT_SERVICE_PUSH, PROPS_REGULAR_CONTAINERS);
+  }
 
-    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+  @Test(timeout = 60000)
+  public void testMixed2() throws Exception { // M-Containers, R-ExtService
+    int expectedExternalSubmissions = 0 + 3; //4 for 4 src files, 3 for num reducers.
+    runJoinValidate("Mixed2", expectedExternalSubmissions, PROPS_REGULAR_CONTAINERS,
+        PROPS_REGULAR_CONTAINERS, PROPS_EXT_SERVICE_PUSH);
+  }
 
-    JoinDataGen dataGen = new JoinDataGen();
-    String[] dataGenArgs = new String[]{
-        dataPath1.toString(), "1048576", dataPath2.toString(), "524288",
-        expectedOutputPath.toString(), "2"};
-    assertEquals(0, dataGen.run(tezConf, dataGenArgs, sharedTezClient));
 
-    HashJoinExample joinExample = new HashJoinExample();
-    String[] args = new String[]{
-        dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
-    assertEquals(0, joinExample.run(tezConf, args, sharedTezClient));
+  private void runJoinValidate(String name, int extExpectedCount, Map<String, String> lhsProps,
+                               Map<String, String> rhsProps,
+                               Map<String, String> validateProps) throws
+      Exception {
+    int externalSubmissionCount = tezTestServiceCluster.getNumSubmissions();
 
-    JoinValidate joinValidate = new JoinValidate();
-    String[] validateArgs = new String[]{
-        expectedOutputPath.toString(), outPath.toString(), "3"};
+    TezConfiguration tezConf = new TezConfiguration(confForJobs);
+    JoinValidateConfigured joinValidate =
+        new JoinValidateConfigured(lhsProps, rhsProps,
+            validateProps, name);
+    String[] validateArgs = new String[]{"-disableSplitGrouping",
+        HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"};
     assertEquals(0, joinValidate.run(tezConf, validateArgs, sharedTezClient));
 
     // Ensure this was actually submitted to the external cluster
-    assertTrue(tezTestServiceCluster.getNumSubmissions() > 0);
+    assertEquals(extExpectedCount,
+        (tezTestServiceCluster.getNumSubmissions() - externalSubmissionCount));
   }
 }