You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:15 UTC

[25/50] [abbrv] tez git commit: TEZ-2004. Define basic interface for pluggable ContainerLaunchers. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
index 2daa51e..2871691 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 
 public abstract class TaskSchedulerService extends AbstractService{
 

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index 2363d18..143342c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index 8ef2a83..cecb019 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 
 public class AMContainerEventCompleted extends AMContainerEvent {

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 7446734..5cff766 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -28,8 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.Credentials;
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -1079,12 +1078,12 @@ public class AMContainerImpl implements AMContainer {
   }
 
   protected void sendStartRequestToNM(ContainerLaunchContext clc) {
-    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, taskCommId));
+    sendEvent(new NMCommunicatorLaunchRequestEvent(clc, container, launcherId, schedulerId, taskCommId));
   }
 
   protected void sendStopRequestToNM() {
     sendEvent(new NMCommunicatorStopRequestEvent(containerId,
-        container.getNodeId(), container.getContainerToken(), launcherId));
+        container.getNodeId(), container.getContainerToken(), launcherId, schedulerId, taskCommId));
   }
 
   protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 0f35bba..3c3c6a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -34,11 +34,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,23 +56,14 @@ import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.launcher.ContainerLauncherRouter;
 import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -89,6 +83,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 public class MockDAGAppMaster extends DAGAppMaster {
   
   private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class);
+  ContainerLauncherContext containerLauncherContext;
   MockContainerLauncher containerLauncher;
   boolean initFailFlag;
   boolean startFailFlag;
@@ -121,7 +116,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
   // Upon, launch of a container is simulates the container asking for tasks
   // Upon receiving a task it simulates completion of the tasks
   // It can be used to preempt the container for a given task
-  public class MockContainerLauncher extends AbstractService implements ContainerLauncher, Runnable {
+  public class MockContainerLauncher extends ContainerLauncher implements Runnable {
 
     BlockingQueue<NMCommunicatorEvent> eventQueue = new LinkedBlockingQueue<NMCommunicatorEvent>();
     Thread eventHandlingThread;
@@ -141,12 +136,14 @@ public class MockDAGAppMaster extends DAGAppMaster {
     Map<TezTaskID, Integer> preemptedTasks = Maps.newConcurrentMap();
     
     Map<TezTaskAttemptID, Integer> tasksWithStatusUpdates = Maps.newConcurrentMap();
-    
-    public MockContainerLauncher(AtomicBoolean goFlag) {
-      super("MockContainerLauncher");
+
+    public MockContainerLauncher(AtomicBoolean goFlag,
+                                 ContainerLauncherContext containerLauncherContext) {
+      super("MockContainerLauncher", containerLauncherContext);
       this.goFlag = goFlag;
     }
 
+
     public class ContainerData {
       ContainerId cId;
       TezTaskAttemptID taId;
@@ -211,20 +208,18 @@ public class MockDAGAppMaster extends DAGAppMaster {
         executorService.shutdownNow();
       }
     }
-    
+
+
     @Override
-    public void handle(NMCommunicatorEvent event) {
-      switch (event.getType()) {
-      case CONTAINER_LAUNCH_REQUEST:
-        launch((NMCommunicatorLaunchRequestEvent) event);
-        break;
-      case CONTAINER_STOP_REQUEST:
-        stop((NMCommunicatorStopRequestEvent)event);
-        break;
-      }
+    public void launchContainer(ContainerLaunchRequest launchRequest) {
+      launch(launchRequest);
     }
-    
-    
+
+    @Override
+    public void stopContainer(ContainerStopRequest stopRequest) {
+      stop(stopRequest);
+    }
+
     void waitToGo() {
       if (goFlag == null) {
         return;
@@ -266,20 +261,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
       tasksWithStatusUpdates.put(tId, numUpdates);
     }
     
-    void stop(NMCommunicatorStopRequestEvent event) {
+    void stop(ContainerStopRequest event) {
       // remove from simulated container list
       containers.remove(event.getContainerId());
-      getContext().getEventHandler().handle(
-          new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+      getContext().containerStopRequested(event.getContainerId());
     }
 
-    void launch(NMCommunicatorLaunchRequestEvent event) {
+    void launch(ContainerLaunchRequest event) {
       // launch container by putting it in simulated container list
       ContainerData cData = new ContainerData(event.getContainerId(),
           event.getContainerLaunchContext());
       containers.put(event.getContainerId(), cData);
       containersToProcess.add(cData);
-      getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));      
+      getContext().containerLaunched(event.getContainerId());
     }
     
     public void waitTillContainersLaunched() throws InterruptedException {
@@ -289,7 +283,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
     
     void incrementTime(long inc) {
-      Clock clock = getContext().getClock();
+      Clock clock = MockDAGAppMaster.this.getContext().getClock();
       if (clock instanceof MockClock) {
         ((MockClock) clock).incrementTime(inc);
       }
@@ -493,7 +487,8 @@ public class MockDAGAppMaster extends DAGAppMaster {
     super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, clock, appSubmitTime,
         isSession, workingDirectory, localDirs, logDirs,  new TezApiVersionInfo().getVersion(), 1,
         credentials, jobUserName);
-    containerLauncher = new MockContainerLauncher(launcherGoFlag);
+    containerLauncherContext = new ContainerLauncherContextImpl(getContext(), getTaskAttemptListener());
+    containerLauncher = new MockContainerLauncher(launcherGoFlag, containerLauncherContext);
     shutdownHandler = new MockDAGAppMasterShutdownHandler();
     this.initFailFlag = initFailFlag;
     this.startFailFlag = startFailFlag;
@@ -508,7 +503,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
                                                                   String[] containerLaunchers,
                                                                   boolean isLocal)
       throws UnknownHostException {
-    return new ContainerLauncherRouter(containerLauncher);
+    return new ContainerLauncherRouter(containerLauncher, getContext());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 7f0362d..df643e4 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -51,8 +51,8 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
index 934543f..8d776fb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 62edac9..e37ab4a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -53,12 +53,11 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -94,7 +93,6 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.internal.matchers.Null;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 0a642bb..b555c62 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 322eabc..f9952d8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,8 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index 9d22196..dbf5054 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -14,34 +14,30 @@
 
 package org.apache.tez.dag.app.launcher;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.TezTestServiceCommunicator;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
 import org.apache.tez.service.TezTestServiceConfConstants;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.RunContainerRequestProto;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TezTestServiceContainerLauncher extends AbstractService implements ContainerLauncher {
+// TODO TEZ-2003 look for all LOG.*(DBG and LOG.*(DEBUG messages
+
+public class TezTestServiceContainerLauncher extends ContainerLauncher {
 
   // TODO Support interruptability of tasks which haven't yet been launched.
 
@@ -49,40 +45,32 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
 
   static final Logger LOG = LoggerFactory.getLogger(TezTestServiceContainerLauncher.class);
 
-  private final AppContext context;
   private final String tokenIdentifier;
-  private final TaskAttemptListener tal;
   private final int servicePort;
   private final TezTestServiceCommunicator communicator;
-  private final Clock clock;
   private final ApplicationAttemptId appAttemptId;
+  //  private final TaskAttemptListener tal;
 
 
   // Configuration passed in here to set up final parameters
-  public TezTestServiceContainerLauncher(AppContext appContext, Configuration conf,
-                                         TaskAttemptListener tal) {
-    super(TezTestServiceContainerLauncher.class.getName());
-    this.clock = appContext.getClock();
-    int numThreads = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
+  public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+    super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext);
+    int numThreads = getContext().getInitialConfiguration().getInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
 
-    this.servicePort = conf.getInt(TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
+    this.servicePort = getContext().getInitialConfiguration().getInt(
+        TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT, -1);
     Preconditions.checkArgument(servicePort > 0,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_RPC_PORT + " must be set");
     this.communicator = new TezTestServiceCommunicator(numThreads);
-    this.context = appContext;
-    this.tokenIdentifier = context.getApplicationID().toString();
-    this.appAttemptId = appContext.getApplicationAttemptId();
-    this.tal = tal;
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) {
-    communicator.init(conf);
+    this.tokenIdentifier = getContext().getApplicationAttemptId().getApplicationId().toString();
+    this.appAttemptId = getContext().getApplicationAttemptId();
   }
 
   @Override
   public void serviceStart() {
+    communicator.init(getContext().getInitialConfiguration());
     communicator.start();
   }
 
@@ -92,51 +80,56 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
   }
 
   @Override
-  public void handle(NMCommunicatorEvent event) {
-    switch (event.getType()) {
-      case CONTAINER_LAUNCH_REQUEST:
-        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
-        RunContainerRequestProto runRequest = constructRunContainerRequest(launchEvent);
-        communicator.runContainer(runRequest, launchEvent.getNodeId().getHost(),
-            launchEvent.getNodeId().getPort(),
-            new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
-              @Override
-              public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
-                LOG.info("Container: " + launchEvent.getContainerId() + " launch succeeded on host: " + launchEvent.getNodeId());
-                context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
-                ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
-                    launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
-                context.getHistoryHandler().handle(new DAGHistoryEvent(
-                    null, lEvt));
-              }
-
-              @Override
-              public void indicateError(Throwable t) {
-                LOG.error("Failed to launch container: " + launchEvent.getContainer() + " on host: " + launchEvent.getNodeId(), t);
-                sendContainerLaunchFailedMsg(launchEvent.getContainerId(), t);
-              }
-            });
-        break;
-      case CONTAINER_STOP_REQUEST:
-        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
-        // that the container is actually done (normally received from RM)
-        // TODO Sending this out for an un-launched container is invalid
-        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
-            AMContainerEventType.C_NM_STOP_SENT));
-        break;
+  public void launchContainer(final ContainerLaunchRequest launchRequest) {
+    RunContainerRequestProto runRequest = null;
+    try {
+      runRequest = constructRunContainerRequest(launchRequest);
+    } catch (IOException e) {
+      getContext().containerLaunchFailed(launchRequest.getContainerId(),
+          "Failed to construct launch request, " + StringUtils.stringifyException(e));
+      return;
     }
+    communicator.runContainer(runRequest, launchRequest.getNodeId().getHost(),
+        launchRequest.getNodeId().getPort(),
+        new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.RunContainerResponseProto>() {
+          @Override
+          public void setResponse(TezTestServiceProtocolProtos.RunContainerResponseProto response) {
+            LOG.info(
+                "Container: " + launchRequest.getContainerId() + " launch succeeded on host: " +
+                    launchRequest.getNodeId());
+            getContext().containerLaunched(launchRequest.getContainerId());
+          }
+
+          @Override
+          public void indicateError(Throwable t) {
+            LOG.error(
+                "Failed to launch container: " + launchRequest.getContainerId() + " on host: " +
+                    launchRequest.getNodeId(), t);
+            sendContainerLaunchFailedMsg(launchRequest.getContainerId(), t);
+          }
+        });
   }
 
-  private RunContainerRequestProto constructRunContainerRequest(NMCommunicatorLaunchRequestEvent event) {
+  @Override
+  public void stopContainer(ContainerStopRequest stopRequest) {
+    LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + stopRequest);
+    // that the container is actually done (normally received from RM)
+    // TODO Sending this out for an un-launched container is invalid
+    getContext().containerStopRequested(stopRequest.getContainerId());
+  }
+
+  private RunContainerRequestProto constructRunContainerRequest(ContainerLaunchRequest launchRequest) throws
+      IOException {
     RunContainerRequestProto.Builder builder = RunContainerRequestProto.newBuilder();
-    InetSocketAddress address = tal.getTaskCommunicator(event.getTaskCommId()).getAddress();
+    Preconditions.checkArgument(launchRequest.getTaskCommunicatorName().equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT));
+    InetSocketAddress address = (InetSocketAddress) getContext().getTaskCommunicatorMetaInfo(launchRequest.getTaskCommunicatorName());
     builder.setAmHost(address.getHostName()).setAmPort(address.getPort());
     builder.setAppAttemptNumber(appAttemptId.getAttemptId());
     builder.setApplicationIdString(appAttemptId.getApplicationId().toString());
     builder.setTokenIdentifier(tokenIdentifier);
-    builder.setContainerIdString(event.getContainer().getId().toString());
+    builder.setContainerIdString(launchRequest.getContainerId().toString());
     builder.setCredentialsBinary(
-        ByteString.copyFrom(event.getContainerLaunchContext().getTokens()));
+        ByteString.copyFrom(launchRequest.getContainerLaunchContext().getTokens()));
     // TODO Avoid reading this from the environment
     builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
     return builder.build();
@@ -144,6 +137,8 @@ public class TezTestServiceContainerLauncher extends AbstractService implements
 
   @SuppressWarnings("unchecked")
   void sendContainerLaunchFailedMsg(ContainerId containerId, Throwable t) {
-    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, t == null ? "" : t.getMessage()));
+    getContext().containerLaunchFailed(containerId, t == null ? "" : t.getMessage());
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index 977d0d3..d3743e1 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -14,53 +14,32 @@
 
 package org.apache.tez.dag.app.launcher;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEvent;
-import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TezTestServiceNoOpContainerLauncher extends AbstractService implements ContainerLauncher {
+public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher {
 
   static final Logger LOG = LoggerFactory.getLogger(TezTestServiceNoOpContainerLauncher.class);
 
-  private final AppContext context;
-  private final Clock clock;
 
-  public TezTestServiceNoOpContainerLauncher(AppContext appContext, Configuration conf,
-                                         TaskAttemptListener tal) {
-    super(TezTestServiceNoOpContainerLauncher.class.getName());
-    this.context = appContext;
-    this.clock = appContext.getClock();
+  public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) {
+    super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext);
   }
 
   @Override
-  public void handle(NMCommunicatorEvent event) {
-    switch(event.getType()) {
-      case CONTAINER_LAUNCH_REQUEST:
-        final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event;
-        LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId());
-        context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId()));
-        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
-            launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
-        context.getHistoryHandler().handle(new DAGHistoryEvent(
-            null, lEvt));
-        break;
-      case CONTAINER_STOP_REQUEST:
-        LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event);
-        context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(),
-            AMContainerEventType.C_NM_STOP_SENT));
-        break;
-    }
+  public void launchContainer(ContainerLaunchRequest launchRequest) {
+    LOG.info("No-op launch for container {} succeeded on host: {}", launchRequest.getContainerId(),
+        launchRequest.getNodeId());
+    getContext().containerLaunched(launchRequest.getContainerId());
+  }
 
+  @Override
+  public void stopContainer(ContainerStopRequest stopRequest) {
+    LOG.info("DEBUG: Ignoring STOP_REQUEST {}", stopRequest);
+    getContext().containerStopRequested(stopRequest.getContainerId());
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
index 073cb50..506e991 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/rm/TezTestServiceTaskSchedulerService.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.service.TezTestServiceConfConstants;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/tez/blob/eec648d3/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 98673a6..444498e 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.ContainerEndReason;
-import org.apache.tez.dag.api.TaskAttemptEndReason;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.app.TezTestServiceCommunicator;