You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/21 03:36:38 UTC

[30/50] [abbrv] tez git commit: TEZ-2651. Pluggable services should not extend AbstractService. (sseth)

TEZ-2651. Pluggable services should not extend AbstractService. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fdef9377
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fdef9377
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fdef9377

Branch: refs/heads/TEZ-2003
Commit: fdef9377f3a06125a785824c821c38e4f3522c21
Parents: b6582f0
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jul 28 14:55:40 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 20 18:22:08 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../serviceplugins/api/ContainerLauncher.java   | 18 ++++++++++--
 .../apache/tez/dag/api/TaskCommunicator.java    | 30 +++++++++++++++++---
 .../tez/dag/api/TaskCommunicatorContext.java    |  5 ++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  4 +--
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 16 ++++++-----
 .../dag/app/TaskCommunicatorContextImpl.java    |  9 ++++++
 .../tez/dag/app/TezTaskCommunicatorImpl.java    | 24 ++++++----------
 .../dag/app/launcher/ContainerLauncherImpl.java |  6 ++--
 .../app/launcher/ContainerLauncherRouter.java   | 12 ++++++--
 .../app/launcher/LocalContainerLauncher.java    |  6 ++--
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  6 ++--
 .../app/TestTaskAttemptListenerImplTezDag.java  |  8 +++---
 .../TezTestServiceContainerLauncher.java        |  6 ++--
 .../TezTestServiceNoOpContainerLauncher.java    |  2 +-
 .../TezTestServiceTaskCommunicatorImpl.java     | 29 +++++++++----------
 16 files changed, 116 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index a51669d..e57f76f 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -36,5 +36,6 @@ ALL CHANGES:
   TEZ-2124. Change Node tracking to work per external container source.
   TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
   TEZ-2005. Define basic interface for pluggable TaskScheduler.
+  TEZ-2651. Pluggable services should not extend AbstractService.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 218edb6..8337dcb 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -17,6 +17,7 @@ package org.apache.tez.serviceplugins.api;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ServicePluginLifecycle;
 
 /**
  * Plugin to allow custom container launchers to be written to launch containers on different types
@@ -25,18 +26,29 @@ import org.apache.hadoop.service.AbstractService;
 
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public abstract class ContainerLauncher extends AbstractService {
+public abstract class ContainerLauncher implements ServicePluginLifecycle {
 
   private final ContainerLauncherContext containerLauncherContext;
 
   // TODO TEZ-2003 Simplify this by moving away from AbstractService. Potentially Guava AbstractService.
   // A serviceInit(Configuration) is not likely to be very useful, and will expose unnecessary internal
   // configuration to the services if populated with the AM Configuration
-  public ContainerLauncher(String name, ContainerLauncherContext containerLauncherContext) {
-    super(name);
+  public ContainerLauncher(ContainerLauncherContext containerLauncherContext) {
     this.containerLauncherContext = containerLauncherContext;
   }
 
+  @Override
+  public void initialize() throws Exception {
+  }
+
+  @Override
+  public void start() throws Exception {
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+  }
+
   public final ContainerLauncherContext getContext() {
     return this.containerLauncherContext;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 05e437c..f221414 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -18,9 +18,9 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.ServicePluginLifecycle;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -28,11 +28,33 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 
 // TODO TEZ-2003 Move this into the tez-api module
-public abstract class TaskCommunicator extends AbstractService {
-  public TaskCommunicator(String name) {
-    super(name);
+public abstract class TaskCommunicator implements ServicePluginLifecycle {
+
+  private final TaskCommunicatorContext taskCommunicatorContext;
+
+  public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+    this.taskCommunicatorContext = taskCommunicatorContext;
+  }
+
+  public TaskCommunicatorContext getContext() {
+    return taskCommunicatorContext;
+  }
+
+  @Override
+  public void initialize() throws Exception {
   }
 
+  @Override
+  public void start() throws Exception {
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+  }
+
+  // TODO Post TEZ-2003 Move this into the API module. Moving this requires abstractions for
+  // TaskSpec and related classes. (assuming that's efficient for execution)
+
   // TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
   // TODO When talking to an external service, this plugin implementer may need access to a host:port
   public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index b6e63f7..ab32ec1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -18,6 +18,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -35,6 +36,9 @@ public interface TaskCommunicatorContext {
 
   // TODO TEZ-2003 Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
 
+  // TODO TEZ-2003 To be replaced by getInitialPayload
+  Configuration getInitialConfiguration();
+
   ApplicationAttemptId getApplicationAttemptId();
   Credentials getCredentials();
 
@@ -42,6 +46,7 @@ public interface TaskCommunicatorContext {
   boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
 
   // TODO TEZ-2003 Split the heartbeat API to a liveness check and a status update
+  // KKK Rename this API
   TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
 
   boolean isKnownContainer(ContainerId containerId);

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index ef27ddf..f3914d8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1047,8 +1047,8 @@ public class DAGAppMaster extends AbstractService {
                                                           String[] taskCommunicatorClasses,
                                                           boolean isLocal) {
     TaskAttemptListener lis =
-        new TaskAttemptListenerImpTezDag(context, thh, chh, jobTokenSecretManager,
-            taskCommunicatorClasses, isLocal);
+        new TaskAttemptListenerImpTezDag(context, thh, chh,
+            taskCommunicatorClasses, amConf, isLocal);
     return lis;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 47b63dd..599c208 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
@@ -61,7 +62,6 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.common.security.JobTokenSecretManager;
 
 
 @SuppressWarnings("unchecked")
@@ -75,6 +75,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private final AppContext context;
   private final TaskCommunicator[] taskCommunicators;
   private final TaskCommunicatorContext[] taskCommunicatorContexts;
+  protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
 
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
@@ -99,9 +100,8 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   public TaskAttemptListenerImpTezDag(AppContext context,
                                       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
-                                      // TODO TEZ-2003 pre-merge. Remove reference to JobTokenSecretManager.
-                                      JobTokenSecretManager jobTokenSecretManager,
                                       String [] taskCommunicatorClassIdentifiers,
+                                      Configuration conf,
                                       boolean isPureLocalMode) {
     super(TaskAttemptListenerImpTezDag.class.getName());
     this.context = context;
@@ -118,9 +118,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
     this.taskCommunicators = new TaskCommunicator[taskCommunicatorClassIdentifiers.length];
     this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorClassIdentifiers.length];
+    this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorClassIdentifiers.length];
     for (int i = 0 ; i < taskCommunicatorClassIdentifiers.length ; i++) {
-      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, i);
+      taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, conf, i);
       taskCommunicators[i] = createTaskCommunicator(taskCommunicatorClassIdentifiers[i], i);
+      taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskCommunicators[i]);
     }
     // TODO TEZ-2118 Start using taskCommunicator indices properly
   }
@@ -129,15 +131,15 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   public void serviceStart() {
     // TODO Why is init tied to serviceStart
     for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicators[i].init(getConfig());
-      taskCommunicators[i].start();
+      taskCommunicatorServiceWrappers[i].init(getConfig());
+      taskCommunicatorServiceWrappers[i].start();
     }
   }
 
   @Override
   public void serviceStop() {
     for (int i = 0 ; i < taskCommunicators.length ; i++) {
-      taskCommunicators[i].stop();
+      taskCommunicatorServiceWrappers[i].stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 50e006d..035db93 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -23,6 +23,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,14 +49,17 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   private final int taskCommunicatorIndex;
   private final ReentrantReadWriteLock.ReadLock dagChangedReadLock;
   private final ReentrantReadWriteLock.WriteLock dagChangedWriteLock;
+  private final Configuration conf;
 
   private DAG dag;
 
   public TaskCommunicatorContextImpl(AppContext appContext,
                                      TaskAttemptListenerImpTezDag taskAttemptListener,
+                                     Configuration conf,
                                      int taskCommunicatorIndex) {
     this.context = appContext;
     this.taskAttemptListener = taskAttemptListener;
+    this.conf = conf;
     this.taskCommunicatorIndex = taskCommunicatorIndex;
 
     ReentrantReadWriteLock dagChangedLock = new ReentrantReadWriteLock();
@@ -64,6 +68,11 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   }
 
   @Override
+  public Configuration getInitialConfiguration() {
+    return conf;
+  }
+
+  @Override
   public ApplicationAttemptId getApplicationAttemptId() {
     return context.getApplicationAttemptId();
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 0374022..93b5b43 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -67,7 +67,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
   private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
       null, true, null, null, false);
 
-  private final TaskCommunicatorContext taskCommunicatorContext;
   private final TezTaskUmbilicalProtocol taskUmbilical;
 
   protected final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
@@ -116,25 +115,24 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
    * Construct the service.
    */
   public TezTaskCommunicatorImpl(TaskCommunicatorContext taskCommunicatorContext) {
-    super(TezTaskCommunicatorImpl.class.getName());
-    this.taskCommunicatorContext = taskCommunicatorContext;
+    super(taskCommunicatorContext);
     this.taskUmbilical = new TezTaskUmbilicalProtocolImpl();
-    this.tokenIdentifier = this.taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
+    this.tokenIdentifier = taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString();
     this.sessionToken = TokenCache.getSessionToken(taskCommunicatorContext.getCredentials());
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     startRpcServer();
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     stopRpcServer();
   }
 
   protected void startRpcServer() {
-    Configuration conf = getConfig();
+    Configuration conf = getContext().getInitialConfiguration();
     try {
       JobTokenSecretManager jobTokenSecretManager =
           new JobTokenSecretManager();
@@ -281,10 +279,6 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     return sessionToken;
   }
 
-  protected TaskCommunicatorContext getTaskCommunicatorContext() {
-    return taskCommunicatorContext;
-  }
-
   public TezTaskUmbilicalProtocol getUmbilical() {
     return this.taskUmbilical;
   }
@@ -305,7 +299,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
         }
         task = getContainerTask(containerId);
         if (task != null && !task.shouldDie()) {
-          taskCommunicatorContext
+          getContext()
               .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId);
         }
       }
@@ -317,7 +311,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
 
     @Override
     public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
-      return taskCommunicatorContext.canCommit(taskAttemptId);
+      return getContext().canCommit(taskAttemptId);
     }
 
     @Override
@@ -370,7 +364,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
         TaskHeartbeatRequest tRequest = new TaskHeartbeatRequest(request.getContainerIdentifier(),
             request.getCurrentTaskAttemptID(), request.getEvents(), request.getStartIndex(),
             request.getPreRoutedStartIndex(), request.getMaxEvents());
-        tResponse = taskCommunicatorContext.heartbeat(tRequest);
+        tResponse = getContext().heartbeat(tRequest);
       }
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
@@ -402,7 +396,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
     ContainerInfo containerInfo = registeredContainers.get(containerId);
     ContainerTask task = null;
     if (containerInfo == null) {
-      if (taskCommunicatorContext.isKnownContainer(containerId)) {
+      if (getContext().isKnownContainer(containerId)) {
         LOG.info("Container with id: " + containerId
             + " is valid, but no longer registered, and will be killed");
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index fe0178c..34c7bc0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -223,7 +223,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
   }
 
   public ContainerLauncherImpl(ContainerLauncherContext containerLauncherContext) {
-    super(ContainerLauncherImpl.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
     this.conf = new Configuration(containerLauncherContext.getInitialConfiguration());
     conf.setInt(
         CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
@@ -235,7 +235,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     // pass a copy of config to ContainerManagementProtocolProxy until YARN-3497 is fixed
     cmProxy =
         new ContainerManagementProtocolProxy(conf);
@@ -307,7 +307,7 @@ public class ContainerLauncherImpl extends ContainerLauncher {
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     if(!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Ignoring multiple stops");
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 9f741cf..7c6a6a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
 import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
 import org.apache.tez.serviceplugins.api.ContainerLauncher;
 import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
@@ -45,6 +46,7 @@ public class ContainerLauncherRouter extends AbstractService
 
   private final ContainerLauncher containerLaunchers[];
   private final ContainerLauncherContext containerLauncherContexts[];
+  protected final ServicePluginLifecycleAbstractService[] containerLauncherServiceWrappers;
   private final AppContext appContext;
 
   @VisibleForTesting
@@ -53,6 +55,8 @@ public class ContainerLauncherRouter extends AbstractService
     this.appContext = context;
     containerLaunchers = new ContainerLauncher[] {containerLauncher};
     containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
+        new ServicePluginLifecycleAbstractService(containerLauncher)};
   }
 
   // Accepting conf to setup final parameters, if required.
@@ -75,6 +79,7 @@ public class ContainerLauncherRouter extends AbstractService
     }
     containerLauncherContexts = new ContainerLauncherContext[containerLauncherClassIdentifiers.length];
     containerLaunchers = new ContainerLauncher[containerLauncherClassIdentifiers.length];
+    containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[containerLauncherClassIdentifiers.length];
 
 
     for (int i = 0; i < containerLauncherClassIdentifiers.length; i++) {
@@ -82,6 +87,7 @@ public class ContainerLauncherRouter extends AbstractService
       containerLauncherContexts[i] = containerLauncherContext;
       containerLaunchers[i] = createContainerLauncher(containerLauncherClassIdentifiers[i], context,
           containerLauncherContext, taskAttemptListener, workingDirectory, isPureLocalMode, conf);
+      containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
     }
   }
 
@@ -130,21 +136,21 @@ public class ContainerLauncherRouter extends AbstractService
   @Override
   public void serviceInit(Configuration conf) {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).init(conf);
+      containerLauncherServiceWrappers[i].init(conf);
     }
   }
 
   @Override
   public void serviceStart() {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).start();
+      containerLauncherServiceWrappers[i].start();
     }
   }
 
   @Override
   public void serviceStop() {
     for (int i = 0 ; i < containerLaunchers.length ; i++) {
-      ((AbstractService) containerLaunchers[i]).stop();
+      containerLauncherServiceWrappers[i].stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index a1b8e29..3975111 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -111,7 +111,7 @@ public class LocalContainerLauncher extends ContainerLauncher {
     // starts up. It's not possible to set these up via a static payload.
     // Will need some kind of mechanism to dynamically crate payloads / bind to parameters
     // after the AM starts up.
-    super(LocalContainerLauncher.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
     this.context = context;
     this.tal = taskAttemptListener;
     this.workingDirectory = workingDirectory;
@@ -139,14 +139,14 @@ public class LocalContainerLauncher extends ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() throws Exception {
+  public void start() throws Exception {
     eventHandlingThread =
         new Thread(new TezSubTaskRunner(), "LocalContainerLauncher-SubTaskRunner");
     eventHandlingThread.start();
   }
 
   @Override
-  public void serviceStop() throws Exception {
+  public void shutdown() throws Exception {
     if (!serviceStopped.compareAndSet(false, true)) {
       LOG.info("Service Already stopped. Ignoring additional stop");
       return;

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 3c3c6a7..21ae5f7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -139,7 +139,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
     public MockContainerLauncher(AtomicBoolean goFlag,
                                  ContainerLauncherContext containerLauncherContext) {
-      super("MockContainerLauncher", containerLauncherContext);
+      super(containerLauncherContext);
       this.goFlag = goFlag;
     }
 
@@ -182,7 +182,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
     
     @Override
-    public void serviceStart() throws Exception {
+    public void start() throws Exception {
       taListener = (TaskAttemptListenerImpTezDag) getTaskAttemptListener();
       taskCommunicator = (TezTaskCommunicatorImpl) taListener.getTaskCommunicator(0);
       eventHandlingThread = new Thread(this);
@@ -199,7 +199,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
     }
 
     @Override
-    public void serviceStop() throws Exception {
+    public void shutdown() throws Exception {
       if (eventHandlingThread != null) {
         eventHandlingThread.interrupt();
         eventHandlingThread.join(2000l);

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index df643e4..41a7373 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -299,7 +299,7 @@ public class TestTaskAttemptListenerImplTezDag {
     sessionToken.setService(identifier.getJobId());
     TokenCache.setSessionToken(sessionToken, credentials);
     taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
     // no exception happen, should started properly
     taskAttemptListener.init(conf);
     taskAttemptListener.start();
@@ -319,7 +319,7 @@ public class TestTaskAttemptListenerImplTezDag {
 
       conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
       taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, null, false);
+          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null, conf, false);
       taskAttemptListener.init(conf);
       taskAttemptListener.start();
       int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
@@ -375,10 +375,10 @@ public class TestTaskAttemptListenerImplTezDag {
     public TaskAttemptListenerImplForTest(AppContext context,
                                           TaskHeartbeatHandler thh,
                                           ContainerHeartbeatHandler chh,
-                                          JobTokenSecretManager jobTokenSecretManager,
                                           String[] taskCommunicatorClassIdentifiers,
+                                          Configuration conf,
                                           boolean isPureLocalMode) {
-      super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers,
+      super(context, thh, chh, taskCommunicatorClassIdentifiers, conf,
           isPureLocalMode);
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
index dbf5054..85f9415 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceContainerLauncher.java
@@ -54,7 +54,7 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
 
   // Configuration passed in here to set up final parameters
   public TezTestServiceContainerLauncher(ContainerLauncherContext containerLauncherContext) {
-    super(TezTestServiceContainerLauncher.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
     int numThreads = getContext().getInitialConfiguration().getInt(
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS,
         TezTestServiceConfConstants.TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT);
@@ -69,13 +69,13 @@ public class TezTestServiceContainerLauncher extends ContainerLauncher {
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     communicator.init(getContext().getInitialConfiguration());
     communicator.start();
   }
 
   @Override
-  public void serviceStop() {
+  public void shutdown() {
     communicator.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
index d3743e1..7b42296 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/launcher/TezTestServiceNoOpContainerLauncher.java
@@ -27,7 +27,7 @@ public class TezTestServiceNoOpContainerLauncher extends ContainerLauncher {
 
 
   public TezTestServiceNoOpContainerLauncher(ContainerLauncherContext containerLauncherContext) {
-    super(TezTestServiceNoOpContainerLauncher.class.getName(), containerLauncherContext);
+    super(containerLauncherContext);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/fdef9377/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 444498e..078ea79 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -23,7 +23,6 @@ import java.util.concurrent.RejectedExecutionException;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.Credentials;
@@ -75,20 +74,20 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
   }
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-    this.communicator.init(conf);
+  public void initialize() throws Exception {
+    super.initialize();
+    this.communicator.init(getContext().getInitialConfiguration());
   }
 
   @Override
-  public void serviceStart() {
-    super.serviceStart();
+  public void start() {
+    super.start();
     this.communicator.start();
   }
 
   @Override
-  public void serviceStop() {
-    super.serviceStop();
+  public void shutdown() {
+    super.shutdown();
     this.communicator.stop();
   }
 
@@ -132,7 +131,7 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
     }
     // Have to register this up front right now. Otherwise, it's possible for the task to start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
-    getTaskCommunicatorContext()
+    getContext()
         .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
     communicator.submitWork(requestProto, host, port,
         new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
@@ -154,19 +153,19 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
               RemoteException re = (RemoteException) t;
               String message = re.toString();
               if (message.contains(RejectedExecutionException.class.getName())) {
-                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.SERVICE_BUSY, "Service Busy");
               } else {
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.toString());
               }
             } else {
               if (t instanceof IOException) {
-                getTaskCommunicatorContext().taskKilled(taskSpec.getTaskAttemptID(),
+                getContext().taskKilled(taskSpec.getTaskAttemptID(),
                     TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
               } else {
-                getTaskCommunicatorContext()
+                getContext()
                     .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER,
                         t.getMessage());
               }
@@ -191,11 +190,11 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
     builder.setAmPort(getAddress().getPort());
     Credentials taskCredentials = new Credentials();
     // Credentials can change across DAGs. Ideally construct only once per DAG.
-    taskCredentials.addAll(getTaskCommunicatorContext().getCredentials());
+    taskCredentials.addAll(getContext().getCredentials());
 
     ByteBuffer credentialsBinary = credentialMap.get(taskSpec.getDAGName());
     if (credentialsBinary == null) {
-      credentialsBinary = serializeCredentials(getTaskCommunicatorContext().getCredentials());
+      credentialsBinary = serializeCredentials(getContext().getCredentials());
       credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
     } else {
       credentialsBinary = credentialsBinary.duplicate();