You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/14 22:58:50 UTC

[32/50] [abbrv] tez git commit: TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)

TEZ-2005. Define basic interface for pluggable TaskScheduler. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/82c24ac0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/82c24ac0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/82c24ac0

Branch: refs/heads/TEZ-2003
Commit: 82c24ac0bcfc383fce9e069272fda5aa84e180c2
Parents: af1cc72
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Jul 22 22:25:01 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 14 13:46:45 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |   1 +
 .../tez/common/ContainerSignatureMatcher.java   |  64 ++++
 .../tez/common/ServicePluginLifecycle.java      |  39 ++
 .../tez/serviceplugins/api/TaskScheduler.java   |  85 +++++
 .../api/TaskSchedulerContext.java               | 114 ++++++
 .../org/apache/tez/common/TezUtilsInternal.java |   1 +
 .../tez/dag/api/TaskCommunicatorInterface.java  |  18 -
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   3 +-
 .../ServicePluginLifecycleAbstractService.java  |  52 +++
 .../dag/app/rm/LocalTaskSchedulerService.java   |  77 ++--
 .../app/rm/TaskSchedulerAppCallbackImpl.java    |  89 -----
 .../app/rm/TaskSchedulerAppCallbackWrapper.java | 307 ----------------
 .../dag/app/rm/TaskSchedulerContextImpl.java    | 174 +++++++++
 .../app/rm/TaskSchedulerContextImplWrapper.java | 368 +++++++++++++++++++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  81 ++--
 .../tez/dag/app/rm/TaskSchedulerService.java    | 111 ------
 .../dag/app/rm/YarnTaskSchedulerService.java    | 121 +++---
 .../dag/app/rm/container/AMContainerImpl.java   |   1 +
 .../dag/app/rm/container/AMContainerMap.java    |   1 +
 .../rm/container/ContainerContextMatcher.java   |   1 +
 .../rm/container/ContainerSignatureMatcher.java |  60 ---
 .../tez/dag/app/rm/TestContainerReuse.java      | 148 ++------
 .../tez/dag/app/rm/TestLocalTaskScheduler.java  |  29 +-
 .../app/rm/TestLocalTaskSchedulerService.java   |  52 ++-
 .../tez/dag/app/rm/TestTaskScheduler.java       | 201 +++++-----
 .../app/rm/TestTaskSchedulerEventHandler.java   |   9 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    | 186 +++++++---
 .../rm/TezTestServiceTaskSchedulerService.java  |  66 +---
 28 files changed, 1357 insertions(+), 1102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 88dd0c7..a51669d 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -35,5 +35,6 @@ ALL CHANGES:
   TEZ-2621. rebase 07/14
   TEZ-2124. Change Node tracking to work per external container source.
   TEZ-2004. Define basic interface for pluggable ContainerLaunchers.
+  TEZ-2005. Define basic interface for pluggable TaskScheduler.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
new file mode 100644
index 0000000..c0a1245
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ContainerSignatureMatcher.java
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface ContainerSignatureMatcher {
+  /**
+   * Checks the compatibility between the specified container signatures.
+   *
+   * @return true if the first signature is a super set of the second
+   *         signature.
+   */
+  public boolean isSuperSet(Object cs1, Object cs2);
+  
+  /**
+   * Checks if the container signatures match exactly
+   * @return true if exact match
+   */
+  public boolean isExactMatch(Object cs1, Object cs2);
+  
+  /**
+   * Gets additional resources specified in lr2, which are not present for lr1
+   * 
+   * @param lr1
+   * @param lr2
+   * @return additional resources specified in lr2, which are not present for lr1
+   */
+  public Map<String, LocalResource> getAdditionalResources(Map<String, LocalResource> lr1,
+      Map<String, LocalResource> lr2);
+
+
+  /**
+   * Do a union of 2 signatures
+   * Pre-condition. This function should only be invoked iff cs1 is compatible with cs2.
+   * i.e. isSuperSet should not return false.
+   * @param cs1 Signature 1 Original signature
+   * @param cs2 Signature 2 New signature
+   * @return Union of 2 signatures
+   */
+  public Object union(Object cs1, Object cs2);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
new file mode 100644
index 0000000..2eaa7be
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ServicePluginLifecycle {
+
+  /**
+   * Perform any additional initialization which may be required beyond the constructor.
+   */
+  void initialize() throws Exception;
+
+  /**
+   * Start the service. This will be invoked after initialization.
+   */
+  void start() throws Exception;
+
+  /**
+   * Shutdown the service. This will be invoked when the service is shutting down.
+   */
+  void shutdown() throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
new file mode 100644
index 0000000..a5b054f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ServicePluginLifecycle;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class TaskScheduler implements ServicePluginLifecycle {
+
+  private final TaskSchedulerContext taskSchedulerContext;
+
+  public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
+    this.taskSchedulerContext = taskSchedulerContext;
+  }
+
+  @Override
+  public void initialize() throws Exception {
+  }
+
+  @Override
+  public void start() throws Exception {
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+  }
+
+
+  public abstract Resource getAvailableResources();
+
+  public abstract int getClusterNodeCount();
+
+  public abstract void dagComplete();
+
+  public abstract Resource getTotalResources();
+
+  public abstract void blacklistNode(NodeId nodeId);
+
+  public abstract void unblacklistNode(NodeId nodeId);
+
+  public abstract void allocateTask(Object task, Resource capability,
+                                    String[] hosts, String[] racks, Priority priority,
+                                    Object containerSignature, Object clientCookie);
+
+  /**
+   * Allocate affinitized to a specific container
+   */
+  public abstract void allocateTask(Object task, Resource capability,
+                                    ContainerId containerId, Priority priority, Object containerSignature,
+                                    Object clientCookie);
+
+  /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
+  public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
+
+  public abstract Object deallocateContainer(ContainerId containerId);
+
+  public abstract void setShouldUnregister();
+
+  public abstract boolean hasUnregistered();
+
+
+  public final TaskSchedulerContext getContext() {
+    return taskSchedulerContext;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
new file mode 100644
index 0000000..b2c8799
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+
+public interface TaskSchedulerContext {
+
+  public class AppFinalStatus {
+    public final FinalApplicationStatus exitStatus;
+    public final String exitMessage;
+    public final String postCompletionTrackingUrl;
+    public AppFinalStatus(FinalApplicationStatus exitStatus,
+                          String exitMessage,
+                          String posCompletionTrackingUrl) {
+      this.exitStatus = exitStatus;
+      this.exitMessage = exitMessage;
+      this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+    }
+  }
+
+  enum AMState {
+    IDLE, RUNNING_APP, COMPLETED
+  }
+
+  // TODO Post TEZ-2003. Remove references to YARN constructs like Container, ContainerStatus, NodeReport
+  // upcall to app must be outside locks
+  public void taskAllocated(Object task,
+                            Object appCookie,
+                            Container container);
+  // this may end up being called for a task+container pair that the app
+  // has not heard about. this can happen because of a race between
+  // taskAllocated() upcall and deallocateTask() downcall
+  public void containerCompleted(Object taskLastAllocated,
+                                 ContainerStatus containerStatus);
+  public void containerBeingReleased(ContainerId containerId);
+  public void nodesUpdated(List<NodeReport> updatedNodes);
+  public void appShutdownRequested();
+
+  // TODO Post TEZ-2003, this method specifically needs some cleaning up.
+  // ClientAMSecretKey is only relevant when running under YARN. As are ApplicationACLs.
+  public void setApplicationRegistrationData(
+      Resource maxContainerCapability,
+      Map<ApplicationAccessType, String> appAcls,
+      ByteBuffer clientAMSecretKey
+  );
+  public void onError(Throwable t);
+  public float getProgress();
+  public void preemptContainer(ContainerId containerId);
+
+  // TODO Post TEZ-2003. Another method which is primarily relevant to YARN clusters for unregistration.
+  public AppFinalStatus getFinalAppStatus();
+
+
+  // Getters
+
+  // TODO TEZ-2003. To be replaced by getInitialPayload
+  public Configuration getInitialConfiguration();
+
+  public String getAppTrackingUrl();
+
+  /**
+   * A custom cluster identifier allocated to schedulers to generate an AppId, if not making
+   * use of YARN
+   * @return
+   */
+  public long getCustomClusterIdentifier();
+
+  public ContainerSignatureMatcher getContainerSignatureMatcher();
+
+  /**
+   * Get the application attempt id for the running application. Relevant when running under YARN
+   * @return
+   */
+  public ApplicationAttemptId getApplicationAttemptId();
+
+  public String getAppHostName();
+
+  public int getAppClientPort();
+
+  public boolean isSession();
+
+  public AMState getAMState();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 4c8c227..532e83c 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
deleted file mode 100644
index 022cd7b..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorInterface.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-public interface TaskCommunicatorInterface {
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index d56fb95..ef27ddf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -98,7 +98,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.AsyncDispatcher;
 import org.apache.tez.common.AsyncDispatcherConcurrent;
 import org.apache.tez.common.GcTimeUpdater;
-import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -149,7 +148,7 @@ import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
 import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.dag.app.web.WebUIService;

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
new file mode 100644
index 0000000..dac1b82
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ServicePluginLifecycleAbstractService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.ServicePluginLifecycle;
+
+/**
+ * Provides service lifecycle management over ServicePlugins using {@link AbstractService}
+ * @param <T>
+ */
+public class ServicePluginLifecycleAbstractService<T extends ServicePluginLifecycle> extends AbstractService {
+
+  private final T service;
+
+  public ServicePluginLifecycleAbstractService(T service) {
+    super(service.getClass().getName());
+    this.service = service;
+  }
+
+  @Override
+  public void serviceInit(Configuration unused) throws Exception {
+    service.initialize();
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    service.start();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    service.shutdown();
+  }
+
+  public T getService() {
+    return service;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index ef789c5..476d00c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -20,16 +20,15 @@ package org.apache.tez.dag.app.rm;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 
 import com.google.common.primitives.Ints;
 
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -43,56 +42,30 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class LocalTaskSchedulerService extends TaskSchedulerService {
+public class LocalTaskSchedulerService extends TaskScheduler {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalTaskSchedulerService.class);
 
-  final TaskSchedulerAppCallback realAppClient;
-  final TaskSchedulerAppCallback appClientDelegate;
   final ContainerSignatureMatcher containerSignatureMatcher;
   final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+  final Configuration conf;
   AsyncDelegateRequestHandler taskRequestHandler;
   Thread asyncDelegateRequestThread;
-  final ExecutorService appCallbackExecutor;
 
   final HashMap<Object, Container> taskAllocations;
-  final String appHostName;
-  final int appHostPort;
   final String appTrackingUrl;
-  final AppContext appContext;
   final long customContainerAppId;
 
-  public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
-      ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
-      int appHostPort, String appTrackingUrl, long customContainerAppId, AppContext appContext) {
-    super(LocalTaskSchedulerService.class.getName());
-    this.realAppClient = appClient;
-    this.appCallbackExecutor = createAppCallbackExecutorService();
-    this.containerSignatureMatcher = containerSignatureMatcher;
-    this.appClientDelegate = createAppCallbackDelegate(appClient);
-    this.appHostName = appHostName;
-    this.appHostPort = appHostPort;
-    this.appTrackingUrl = appTrackingUrl;
-    this.appContext = appContext;
+  public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
+    super(taskSchedulerContext);
     taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
     taskAllocations = new LinkedHashMap<Object, Container>();
-    this.customContainerAppId = customContainerAppId;
-  }
-
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-
-  private TaskSchedulerAppCallback createAppCallbackDelegate(
-      TaskSchedulerAppCallback realAppClient) {
-    return new TaskSchedulerAppCallbackWrapper(realAppClient,
-        appCallbackExecutor);
+    this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
+    this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
+    this.customContainerAppId = taskSchedulerContext.getCustomClusterIdentifier();
+    this.conf = taskSchedulerContext.getInitialConfiguration();
   }
 
   @Override
@@ -160,7 +133,7 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
+  public void initialize() {
     taskRequestHandler = createRequestHandler(conf);
     asyncDelegateRequestThread = new Thread(taskRequestHandler);
     asyncDelegateRequestThread.setDaemon(true);
@@ -168,24 +141,22 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
 
   protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
     return new AsyncDelegateRequestHandler(taskRequestQueue,
-        new LocalContainerFactory(appContext, customContainerAppId),
+        new LocalContainerFactory(getContext().getApplicationAttemptId(), customContainerAppId),
         taskAllocations,
-        appClientDelegate,
+        getContext(),
         conf);
   }
 
   @Override
-  public void serviceStart() {
+  public void start() {
     asyncDelegateRequestThread.start();
   }
 
   @Override
-  public void serviceStop() throws InterruptedException {
+  public void shutdown() throws InterruptedException {
     if (asyncDelegateRequestThread != null) {
       asyncDelegateRequestThread.interrupt();
     }
-    appCallbackExecutor.shutdownNow();
-    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -202,12 +173,12 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     AtomicInteger nextId;
     final ApplicationAttemptId customAppAttemptId;
 
-    public LocalContainerFactory(AppContext appContext, long appIdLong) {
+    public LocalContainerFactory(ApplicationAttemptId appAttemptId, long customAppId) {
       this.nextId = new AtomicInteger(1);
       ApplicationId appId = ApplicationId
-          .newInstance(appIdLong, appContext.getApplicationAttemptId().getApplicationId().getId());
+          .newInstance(customAppId, appAttemptId.getApplicationId().getId());
       this.customAppAttemptId = ApplicationAttemptId
-          .newInstance(appId, appContext.getApplicationAttemptId().getAttemptId());
+          .newInstance(appId, appAttemptId.getAttemptId());
     }
 
     public Container createContainer(Resource capability, Priority priority) {
@@ -330,18 +301,18 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     final BlockingQueue<TaskRequest> taskRequestQueue;
     final LocalContainerFactory localContainerFactory;
     final HashMap<Object, Container> taskAllocations;
-    final TaskSchedulerAppCallback appClientDelegate;
+    final TaskSchedulerContext taskSchedulerContext;
     final int MAX_TASKS;
 
     AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
         LocalContainerFactory localContainerFactory,
         HashMap<Object, Container> taskAllocations,
-        TaskSchedulerAppCallback appClientDelegate,
+        TaskSchedulerContext taskSchedulerContext,
         Configuration conf) {
       this.taskRequestQueue = taskRequestQueue;
       this.localContainerFactory = localContainerFactory;
       this.taskAllocations = taskAllocations;
-      this.appClientDelegate = appClientDelegate;
+      this.taskSchedulerContext = taskSchedulerContext;
       this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
           TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
     }
@@ -407,13 +378,13 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
       Container container = localContainerFactory.createContainer(request.capability,
           request.priority);
       taskAllocations.put(request.task, container);
-      appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
+      taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container);
     }
 
     void deallocateTask(DeallocateTaskRequest request) {
       Container container = taskAllocations.remove(request.task);
       if (container != null) {
-        appClientDelegate.containerBeingReleased(container.getId());
+        taskSchedulerContext.containerBeingReleased(container.getId());
       }
       else {
         boolean deallocationBeforeAllocation = false;

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
deleted file mode 100644
index ea37e94..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackImpl.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class TaskSchedulerAppCallbackImpl implements TaskSchedulerService.TaskSchedulerAppCallback{
-
-  private final TaskSchedulerEventHandler tseh;
-  private final int schedulerId;
-
-  public TaskSchedulerAppCallbackImpl(TaskSchedulerEventHandler tseh, int schedulerId) {
-    this.tseh = tseh;
-    this.schedulerId = schedulerId;
-  }
-
-  @Override
-  public void taskAllocated(Object task, Object appCookie, Container container) {
-    tseh.taskAllocated(schedulerId, task, appCookie, container);
-  }
-
-  @Override
-  public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
-    tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
-  }
-
-  @Override
-  public void containerBeingReleased(ContainerId containerId) {
-    tseh.containerBeingReleased(schedulerId, containerId);
-  }
-
-  @Override
-  public void nodesUpdated(List<NodeReport> updatedNodes) {
-    tseh.nodesUpdated(schedulerId, updatedNodes);
-  }
-
-  @Override
-  public void appShutdownRequested() {
-    tseh.appShutdownRequested(schedulerId);
-  }
-
-  @Override
-  public void setApplicationRegistrationData(Resource maxContainerCapability,
-                                             Map<ApplicationAccessType, String> appAcls,
-                                             ByteBuffer clientAMSecretKey) {
-    tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    tseh.onError(schedulerId, t);
-  }
-
-  @Override
-  public float getProgress() {
-    return tseh.getProgress(schedulerId);
-  }
-
-  @Override
-  public void preemptContainer(ContainerId containerId) {
-    tseh.preemptContainer(schedulerId, containerId);
-  }
-
-  @Override
-  public AppFinalStatus getFinalAppStatus() {
-    return tseh.getFinalAppStatus();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
deleted file mode 100644
index 5de8032..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
-
-/**
- * Makes use of an ExecutionService to invoke application callbacks. Methods
- * which return values wait for execution to complete - effectively waiting for
- * all previous events in the queue to complete.
- */
-class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
-
-  private TaskSchedulerAppCallback real;
-
-  ExecutorService executorService;
-  
-  /**
-   * @param real the actual TaskSchedulerAppCallback
-   * @param executorService the ExecutorService to be used to send these events.
-   */
-  public TaskSchedulerAppCallbackWrapper(TaskSchedulerAppCallback real,
-      ExecutorService executorService) {
-    this.real = real;
-    this.executorService = executorService;
-  }
-
-  @Override
-  public void taskAllocated(Object task, Object appCookie, Container container) {
-    executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
-        container));
-  }
-
-  @Override
-  public void containerCompleted(Object taskLastAllocated,
-      ContainerStatus containerStatus) {
-    executorService.submit(new ContainerCompletedCallable(real,
-        taskLastAllocated, containerStatus));
-  }
-
-  @Override
-  public void containerBeingReleased(ContainerId containerId) {
-    executorService
-        .submit(new ContainerBeingReleasedCallable(real, containerId));
-  }
-
-  @Override
-  public void nodesUpdated(List<NodeReport> updatedNodes) {
-    executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
-  }
-
-  @Override
-  public void appShutdownRequested() {
-    executorService.submit(new AppShudownRequestedCallable(real));
-  }
-
-  @Override
-  public void setApplicationRegistrationData(Resource maxContainerCapability,
-      Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
-    executorService.submit(new SetApplicationRegistrationDataCallable(real,
-        maxContainerCapability, appAcls, key));
-  }
-
-  @Override
-  public void onError(Throwable t) {
-    executorService.submit(new OnErrorCallable(real, t));
-  }
-
-  @Override
-  public float getProgress() {
-    Future<Float> progressFuture = executorService
-        .submit(new GetProgressCallable(real));
-    try {
-      return progressFuture.get();
-    } catch (Exception e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-  
-  @Override
-  public void preemptContainer(ContainerId containerId) {
-    executorService.submit(new PreemptContainerCallable(real, containerId));
-  }
-
-  @Override
-  public AppFinalStatus getFinalAppStatus() {
-    Future<AppFinalStatus> appFinalStatusFuture = executorService
-        .submit(new GetFinalAppStatusCallable(real));
-    try {
-      return appFinalStatusFuture.get();
-    } catch (Exception e) {
-      throw new TezUncheckedException(e);
-    }
-  }
-  
-  
-  static abstract class TaskSchedulerAppCallbackBase {
-
-    protected TaskSchedulerAppCallback app;
-
-    public TaskSchedulerAppCallbackBase(TaskSchedulerAppCallback app) {
-      this.app = app;
-    }
-  }
-
-  static class TaskAllocatedCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<Void> {
-    private final Object task;
-    private final Object appCookie;
-    private final Container container;
-
-    public TaskAllocatedCallable(TaskSchedulerAppCallback app, Object task,
-        Object appCookie, Container container) {
-      super(app);
-      this.task = task;
-      this.appCookie = appCookie;
-      this.container = container;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.taskAllocated(task, appCookie, container);
-      return null;
-    }
-  }
-
-  static class ContainerCompletedCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<Void> {
-
-    private final Object taskLastAllocated;
-    private final ContainerStatus containerStatus;
-
-    public ContainerCompletedCallable(TaskSchedulerAppCallback app,
-        Object taskLastAllocated, ContainerStatus containerStatus) {
-      super(app);
-      this.taskLastAllocated = taskLastAllocated;
-      this.containerStatus = containerStatus;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.containerCompleted(taskLastAllocated, containerStatus);
-      return null;
-    }
-  }
-
-  static class ContainerBeingReleasedCallable extends
-      TaskSchedulerAppCallbackBase implements Callable<Void> {
-    private final ContainerId containerId;
-
-    public ContainerBeingReleasedCallable(TaskSchedulerAppCallback app,
-        ContainerId containerId) {
-      super(app);
-      this.containerId = containerId;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.containerBeingReleased(containerId);
-      return null;
-    }
-  }
-
-  static class NodesUpdatedCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<Void> {
-    private final List<NodeReport> updatedNodes;
-
-    public NodesUpdatedCallable(TaskSchedulerAppCallback app,
-        List<NodeReport> updatedNodes) {
-      super(app);
-      this.updatedNodes = updatedNodes;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.nodesUpdated(updatedNodes);
-      return null;
-    }
-  }
-
-  static class AppShudownRequestedCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<Void> {
-
-    public AppShudownRequestedCallable(TaskSchedulerAppCallback app) {
-      super(app);
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.appShutdownRequested();
-      return null;
-    }
-  }
-
-  static class SetApplicationRegistrationDataCallable extends
-      TaskSchedulerAppCallbackBase implements Callable<Void> {
-
-    private final Resource maxContainerCapability;
-    private final Map<ApplicationAccessType, String> appAcls;
-    private final ByteBuffer key;
-
-    public SetApplicationRegistrationDataCallable(TaskSchedulerAppCallback app,
-        Resource maxContainerCapability,
-        Map<ApplicationAccessType, String> appAcls,
-        ByteBuffer key) {
-      super(app);
-      this.maxContainerCapability = maxContainerCapability;
-      this.appAcls = appAcls;
-      this.key = key;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
-      return null;
-    }
-  }
-
-  static class OnErrorCallable extends TaskSchedulerAppCallbackBase implements
-      Callable<Void> {
-
-    private final Throwable throwable;
-
-    public OnErrorCallable(TaskSchedulerAppCallback app, Throwable throwable) {
-      super(app);
-      this.throwable = throwable;
-    }
-
-    @Override
-    public Void call() throws Exception {
-      app.onError(throwable);
-      return null;
-    }
-  }
-
-  static class PreemptContainerCallable extends TaskSchedulerAppCallbackBase 
-      implements Callable<Void> {
-    private final ContainerId containerId;
-    
-    public PreemptContainerCallable(TaskSchedulerAppCallback app, ContainerId id) {
-      super(app);
-      this.containerId = id;
-    }
-    
-    @Override
-    public Void call() throws Exception {
-      app.preemptContainer(containerId);
-      return null;
-    }
-  }
-  
-  static class GetProgressCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<Float> {
-
-    public GetProgressCallable(TaskSchedulerAppCallback app) {
-      super(app);
-    }
-
-    @Override
-    public Float call() throws Exception {
-      return app.getProgress();
-    }
-  }
-
-  static class GetFinalAppStatusCallable extends TaskSchedulerAppCallbackBase
-      implements Callable<AppFinalStatus> {
-
-    public GetFinalAppStatusCallable(TaskSchedulerAppCallback app) {
-      super(app);
-    }
-
-    @Override
-    public AppFinalStatus call() throws Exception {
-      return app.getFinalAppStatus();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
new file mode 100644
index 0000000..890870e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+
+public class TaskSchedulerContextImpl implements TaskSchedulerContext {
+
+  private final TaskSchedulerEventHandler tseh;
+  private final AppContext appContext;
+  private final int schedulerId;
+  private final String trackingUrl;
+  private final long customClusterIdentifier;
+  private final String appHostName;
+  private final int clientPort;
+  private final Configuration conf;
+
+  public TaskSchedulerContextImpl(TaskSchedulerEventHandler tseh, AppContext appContext,
+                                  int schedulerId, String trackingUrl, long customClusterIdentifier,
+                                  String appHostname, int clientPort,
+                                  Configuration conf) {
+    this.tseh = tseh;
+    this.appContext = appContext;
+    this.schedulerId = schedulerId;
+    this.trackingUrl = trackingUrl;
+    this.customClusterIdentifier = customClusterIdentifier;
+    this.appHostName = appHostname;
+    this.clientPort = clientPort;
+    this.conf = conf;
+
+  }
+
+  @Override
+  public void taskAllocated(Object task, Object appCookie, Container container) {
+    tseh.taskAllocated(schedulerId, task, appCookie, container);
+  }
+
+  @Override
+  public void containerCompleted(Object taskLastAllocated, ContainerStatus containerStatus) {
+    tseh.containerCompleted(schedulerId, taskLastAllocated, containerStatus);
+  }
+
+  @Override
+  public void containerBeingReleased(ContainerId containerId) {
+    tseh.containerBeingReleased(schedulerId, containerId);
+  }
+
+  @Override
+  public void nodesUpdated(List<NodeReport> updatedNodes) {
+    tseh.nodesUpdated(schedulerId, updatedNodes);
+  }
+
+  @Override
+  public void appShutdownRequested() {
+    tseh.appShutdownRequested(schedulerId);
+  }
+
+  @Override
+  public void setApplicationRegistrationData(Resource maxContainerCapability,
+                                             Map<ApplicationAccessType, String> appAcls,
+                                             ByteBuffer clientAMSecretKey) {
+    tseh.setApplicationRegistrationData(schedulerId, maxContainerCapability, appAcls, clientAMSecretKey);
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    tseh.onError(schedulerId, t);
+  }
+
+  @Override
+  public float getProgress() {
+    return tseh.getProgress(schedulerId);
+  }
+
+  @Override
+  public void preemptContainer(ContainerId containerId) {
+    tseh.preemptContainer(schedulerId, containerId);
+  }
+
+  @Override
+  public AppFinalStatus getFinalAppStatus() {
+    return tseh.getFinalAppStatus();
+  }
+
+  @Override
+  public Configuration getInitialConfiguration() {
+    return conf;
+  }
+
+
+  @Override
+  public String getAppTrackingUrl() {
+    return trackingUrl;
+  }
+
+  @Override
+  public long getCustomClusterIdentifier() {
+    return customClusterIdentifier;
+  }
+
+  @Override
+  public ContainerSignatureMatcher getContainerSignatureMatcher() {
+    return tseh.getContainerSignatureMatcher();
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appContext.getApplicationAttemptId();
+  }
+
+  @Override
+  public String getAppHostName() {
+    return appHostName;
+  }
+
+  @Override
+  public int getAppClientPort() {
+    return clientPort;
+  }
+
+  @Override
+  public boolean isSession() {
+    return appContext.isSession();
+  }
+
+  @Override
+  public AMState getAMState() {
+    switch (appContext.getAMState()) {
+
+      case NEW:
+      case INITED:
+      case IDLE:
+        return AMState.IDLE;
+      case RECOVERING:
+        // TODO Is this correct for recovery ?
+      case RUNNING:
+        return AMState.RUNNING_APP;
+      case SUCCEEDED:
+      case FAILED:
+      case KILLED:
+      case ERROR:
+        return AMState.COMPLETED;
+      default:
+        throw new TezUncheckedException("Unexpected state " + appContext.getAMState());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
new file mode 100644
index 0000000..e64ef43
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImplWrapper.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.ContainerSignatureMatcher;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+
+/**
+ * Makes use of an ExecutionService to invoke application callbacks. Methods
+ * which return values wait for execution to complete - effectively waiting for
+ * all previous events in the queue to complete.
+ */
+class TaskSchedulerContextImplWrapper implements TaskSchedulerContext {
+
+  private TaskSchedulerContext real;
+
+  private ExecutorService executorService;
+  
+  /**
+   * @param real the actual TaskSchedulerAppCallback
+   * @param executorService the ExecutorService to be used to send these events.
+   */
+  public TaskSchedulerContextImplWrapper(TaskSchedulerContext real,
+                                         ExecutorService executorService) {
+    this.real = real;
+    this.executorService = executorService;
+  }
+
+  @Override
+  public void taskAllocated(Object task, Object appCookie, Container container) {
+    executorService.submit(new TaskAllocatedCallable(real, task, appCookie,
+        container));
+  }
+
+  @Override
+  public void containerCompleted(Object taskLastAllocated,
+      ContainerStatus containerStatus) {
+    executorService.submit(new ContainerCompletedCallable(real,
+        taskLastAllocated, containerStatus));
+  }
+
+  @Override
+  public void containerBeingReleased(ContainerId containerId) {
+    executorService
+        .submit(new ContainerBeingReleasedCallable(real, containerId));
+  }
+
+  @Override
+  public void nodesUpdated(List<NodeReport> updatedNodes) {
+    executorService.submit(new NodesUpdatedCallable(real, updatedNodes));
+  }
+
+  @Override
+  public void appShutdownRequested() {
+    executorService.submit(new AppShudownRequestedCallable(real));
+  }
+
+  @Override
+  public void setApplicationRegistrationData(Resource maxContainerCapability,
+      Map<ApplicationAccessType, String> appAcls, ByteBuffer key) {
+    executorService.submit(new SetApplicationRegistrationDataCallable(real,
+        maxContainerCapability, appAcls, key));
+  }
+
+  @Override
+  public void onError(Throwable t) {
+    executorService.submit(new OnErrorCallable(real, t));
+  }
+
+  @Override
+  public float getProgress() {
+    Future<Float> progressFuture = executorService
+        .submit(new GetProgressCallable(real));
+    try {
+      return progressFuture.get();
+    } catch (Exception e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+  
+  @Override
+  public void preemptContainer(ContainerId containerId) {
+    executorService.submit(new PreemptContainerCallable(real, containerId));
+  }
+
+  @Override
+  public AppFinalStatus getFinalAppStatus() {
+    Future<AppFinalStatus> appFinalStatusFuture = executorService
+        .submit(new GetFinalAppStatusCallable(real));
+    try {
+      return appFinalStatusFuture.get();
+    } catch (Exception e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
+  // Getters which do not need to go through a thread. Underlying implementation
+  // does not use locks.
+
+  @Override
+  public Configuration getInitialConfiguration() {
+    return real.getInitialConfiguration();
+  }
+
+  @Override
+  public String getAppTrackingUrl() {
+    return real.getAppTrackingUrl();
+  }
+
+  @Override
+  public long getCustomClusterIdentifier() {
+    return real.getCustomClusterIdentifier();
+  }
+
+  @Override
+  public ContainerSignatureMatcher getContainerSignatureMatcher() {
+    return real.getContainerSignatureMatcher();
+  }
+
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return real.getApplicationAttemptId();
+  }
+
+  @Override
+  public String getAppHostName() {
+    return real.getAppHostName();
+  }
+
+  @Override
+  public int getAppClientPort() {
+    return real.getAppClientPort();
+  }
+
+  @Override
+  public boolean isSession() {
+    return real.isSession();
+  }
+
+  @Override
+  public AMState getAMState() {
+    return real.getAMState();
+  }
+  // End of getters which do not need to go through a thread. Underlying implementation
+  // does not use locks.
+
+
+  static abstract class TaskSchedulerContextCallbackBase {
+
+    protected TaskSchedulerContext app;
+
+    public TaskSchedulerContextCallbackBase(TaskSchedulerContext app) {
+      this.app = app;
+    }
+  }
+
+  static class TaskAllocatedCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Void> {
+    private final Object task;
+    private final Object appCookie;
+    private final Container container;
+
+    public TaskAllocatedCallable(TaskSchedulerContext app, Object task,
+        Object appCookie, Container container) {
+      super(app);
+      this.task = task;
+      this.appCookie = appCookie;
+      this.container = container;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.taskAllocated(task, appCookie, container);
+      return null;
+    }
+  }
+
+  static class ContainerCompletedCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Void> {
+
+    private final Object taskLastAllocated;
+    private final ContainerStatus containerStatus;
+
+    public ContainerCompletedCallable(TaskSchedulerContext app,
+        Object taskLastAllocated, ContainerStatus containerStatus) {
+      super(app);
+      this.taskLastAllocated = taskLastAllocated;
+      this.containerStatus = containerStatus;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.containerCompleted(taskLastAllocated, containerStatus);
+      return null;
+    }
+  }
+
+  static class ContainerBeingReleasedCallable extends
+      TaskSchedulerContextCallbackBase implements Callable<Void> {
+    private final ContainerId containerId;
+
+    public ContainerBeingReleasedCallable(TaskSchedulerContext app,
+        ContainerId containerId) {
+      super(app);
+      this.containerId = containerId;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.containerBeingReleased(containerId);
+      return null;
+    }
+  }
+
+  static class NodesUpdatedCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Void> {
+    private final List<NodeReport> updatedNodes;
+
+    public NodesUpdatedCallable(TaskSchedulerContext app,
+        List<NodeReport> updatedNodes) {
+      super(app);
+      this.updatedNodes = updatedNodes;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.nodesUpdated(updatedNodes);
+      return null;
+    }
+  }
+
+  static class AppShudownRequestedCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Void> {
+
+    public AppShudownRequestedCallable(TaskSchedulerContext app) {
+      super(app);
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.appShutdownRequested();
+      return null;
+    }
+  }
+
+  static class SetApplicationRegistrationDataCallable extends
+      TaskSchedulerContextCallbackBase implements Callable<Void> {
+
+    private final Resource maxContainerCapability;
+    private final Map<ApplicationAccessType, String> appAcls;
+    private final ByteBuffer key;
+
+    public SetApplicationRegistrationDataCallable(TaskSchedulerContext app,
+        Resource maxContainerCapability,
+        Map<ApplicationAccessType, String> appAcls,
+        ByteBuffer key) {
+      super(app);
+      this.maxContainerCapability = maxContainerCapability;
+      this.appAcls = appAcls;
+      this.key = key;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.setApplicationRegistrationData(maxContainerCapability, appAcls, key);
+      return null;
+    }
+  }
+
+  static class OnErrorCallable extends TaskSchedulerContextCallbackBase implements
+      Callable<Void> {
+
+    private final Throwable throwable;
+
+    public OnErrorCallable(TaskSchedulerContext app, Throwable throwable) {
+      super(app);
+      this.throwable = throwable;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      app.onError(throwable);
+      return null;
+    }
+  }
+
+  static class PreemptContainerCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Void> {
+    private final ContainerId containerId;
+    
+    public PreemptContainerCallable(TaskSchedulerContext app, ContainerId id) {
+      super(app);
+      this.containerId = id;
+    }
+    
+    @Override
+    public Void call() throws Exception {
+      app.preemptContainer(containerId);
+      return null;
+    }
+  }
+  
+  static class GetProgressCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<Float> {
+
+    public GetProgressCallable(TaskSchedulerContext app) {
+      super(app);
+    }
+
+    @Override
+    public Float call() throws Exception {
+      return app.getProgress();
+    }
+  }
+
+  static class GetFinalAppStatusCallable extends TaskSchedulerContextCallbackBase
+      implements Callable<AppFinalStatus> {
+
+    public GetFinalAppStatusCallable(TaskSchedulerContext app) {
+      super(app);
+    }
+
+    @Override
+    public AppFinalStatus call() throws Exception {
+      return app.getFinalAppStatus();
+    }
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  ExecutorService getExecutorService() {
+    return executorService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1ad0059..d8cf080 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -25,11 +25,19 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
+import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -62,7 +70,6 @@ import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -70,7 +77,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
 import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
 import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
@@ -106,7 +113,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
       new AtomicBoolean(false);
   private final WebUIService webUI;
   private final String[] taskSchedulerClasses;
-  protected final TaskSchedulerService []taskSchedulers;
+  protected final TaskScheduler[]taskSchedulers;
+  protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
+
+  // Single executor service shared by all Schedulers for context callbacks
+  @VisibleForTesting
+  final ExecutorService appCallbackExecutor;
 
   private final boolean isPureLocalMode;
   // If running in non local-only mode, the YARN task scheduler will always run to take care of
@@ -147,6 +159,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     this.webUI = webUI;
     this.historyUrl = getHistoryUrl();
     this.isPureLocalMode = isPureLocalMode;
+    this.appCallbackExecutor = createAppCallbackExecutorService();
     if (this.webUI != null) {
       this.webUI.setHistoryUrl(this.historyUrl);
     }
@@ -181,7 +194,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
         this.yarnTaskSchedulerIndex = foundYarnTaskSchedulerIndex;
       }
     }
-    taskSchedulers = new TaskSchedulerService[this.taskSchedulerClasses.length];
+    taskSchedulers = new TaskScheduler[this.taskSchedulerClasses.length];
+    taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerClasses.length];
   }
 
   public Map<ApplicationAccessType, String> getApplicationAcls() {
@@ -205,6 +219,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     return taskSchedulers[schedulerId].getTotalResources();
   }
 
+  private ExecutorService createAppCallbackExecutorService() {
+    return Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d").setDaemon(true)
+            .build());
+  }
+
   public synchronized void handleEvent(AMSchedulerEvent sEvent) {
     LOG.info("Processing the event " + sEvent.toString());
     switch (sEvent.getType()) {
@@ -315,7 +335,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
       // stopped.
       // AMNodeImpl blacklisting logic does not account for KILLED attempts.
       sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
-          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(), attemptContainerId,
+          get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
+          attemptContainerId,
           attempt.getID(), event.getState() == TaskAttemptState.FAILED));
     }
   }
@@ -389,32 +410,30 @@ public class TaskSchedulerEventHandler extends AbstractService implements
         event);
   }
 
-  private TaskSchedulerService createTaskScheduler(String host, int port, String trackingUrl,
+  private TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
                                                    AppContext appContext,
                                                    String schedulerClassName,
                                                    long customAppIdIdentifier,
                                                    int schedulerId) {
-    TaskSchedulerAppCallback appCallback = new TaskSchedulerAppCallbackImpl(this, schedulerId);
+    TaskSchedulerContext rawContext =
+        new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
+            customAppIdIdentifier, host, port, getConfig());
+    TaskSchedulerContext wrappedContext = new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
     if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
-      return new YarnTaskSchedulerService(appCallback, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+      return new YarnTaskSchedulerService(wrappedContext);
     } else if (schedulerClassName.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT)) {
       LOG.info("Creating TaskScheduler: Local TaskScheduler");
-      return new LocalTaskSchedulerService(appCallback, this.containerSignatureMatcher,
-          host, port, trackingUrl, customAppIdIdentifier, appContext);
+      return new LocalTaskSchedulerService(wrappedContext);
     } else {
       LOG.info("Creating custom TaskScheduler: " + schedulerClassName);
-      // TODO TEZ-2003 Temporary reflection with specific parameters. Remove once there is a clean interface.
-      Class<? extends TaskSchedulerService> taskSchedulerClazz =
-          (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+      Class<? extends TaskScheduler> taskSchedulerClazz =
+          (Class<? extends TaskScheduler>) ReflectionUtils.getClazz(schedulerClassName);
       try {
-        Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
-            .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
-                int.class, String.class, long.class, Configuration.class);
+        Constructor<? extends TaskScheduler> ctor = taskSchedulerClazz
+            .getConstructor(TaskSchedulerContext.class);
         ctor.setAccessible(true);
-        return ctor.newInstance(appCallback, appContext, host, port, trackingUrl, customAppIdIdentifier,
-            getConfig());
+        return ctor.newInstance(wrappedContext);
       } catch (NoSuchMethodException e) {
         throw new TezUncheckedException(e);
       } catch (InvocationTargetException e) {
@@ -444,6 +463,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
           customAppIdIdentifier);
       taskSchedulers[i] = createTaskScheduler(host, port,
           trackingUrl, appContext, taskSchedulerClasses[i], customAppIdIdentifier, i);
+      taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]);
     }
   }
 
@@ -460,8 +480,8 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     instantiateScheduelrs(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
 
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
-      taskSchedulers[i].init(getConfig());
-      taskSchedulers[i].start();
+      taskSchedulerServiceWrappers[i].init(getConfig());
+      taskSchedulerServiceWrappers[i].start();
       if (shouldUnregisterFlag.get()) {
         // Flag may have been set earlier when task scheduler was not initialized
         // TODO TEZ-2003 Should setRegister / unregister be part of APIs when not YARN specific ?
@@ -510,7 +530,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
   }
   
   @Override
-  public void serviceStop() {
+  public void serviceStop() throws InterruptedException {
     synchronized(this) {
       this.stopEventHandling = true;
       if (eventHandlingThread != null)
@@ -518,9 +538,12 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     }
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
       if (taskSchedulers[i] != null) {
-        taskSchedulers[i].stop();
+        taskSchedulerServiceWrappers[i].stop();
       }
     }
+    LOG.info("Shutting down AppCallbackExecutor");
+    appCallbackExecutor.shutdownNow();
+    appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
   }
 
   // TODO TEZ-2003 Consolidate TaskSchedulerAppCallback methods once these methods are moved into context
@@ -716,6 +739,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
     }
   }
 
+  public ContainerSignatureMatcher getContainerSignatureMatcher() {
+    return containerSignatureMatcher;
+  }
+
   public boolean hasUnregistered() {
     boolean result = true;
     for (int i = 0 ; i < taskSchedulers.length ; i++) {
@@ -757,4 +784,10 @@ public class TaskSchedulerEventHandler extends AbstractService implements
 
     return historyUrl;
   }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  ExecutorService getContextExecutorService() {
+    return appCallbackExecutor;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/82c24ac0/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
deleted file mode 100644
index 25fd13e..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-
-public abstract class TaskSchedulerService extends AbstractService{
-
-  public TaskSchedulerService(String name) {
-    super(name);
-  }
-
-  public abstract Resource getAvailableResources();
-
-  public abstract int getClusterNodeCount();
-
-  public abstract void dagComplete();
-
-  public abstract Resource getTotalResources();
-
-  public abstract void blacklistNode(NodeId nodeId);
-
-  public abstract void unblacklistNode(NodeId nodeId);
-
-  public abstract void allocateTask(Object task, Resource capability,
-      String[] hosts, String[] racks, Priority priority,
-      Object containerSignature, Object clientCookie);
-  
-  /**
-   * Allocate affinitized to a specific container
-   */
-  public abstract void allocateTask(Object task, Resource capability,
-      ContainerId containerId, Priority priority, Object containerSignature,
-      Object clientCookie);
-
-  /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
-  public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
-
-  public abstract Object deallocateContainer(ContainerId containerId);
-
-  public abstract void setShouldUnregister();
-
-  public abstract boolean hasUnregistered();
-
-  public interface TaskSchedulerAppCallback {
-    public class AppFinalStatus {
-      public final FinalApplicationStatus exitStatus;
-      public final String exitMessage;
-      public final String postCompletionTrackingUrl;
-      public AppFinalStatus(FinalApplicationStatus exitStatus,
-                             String exitMessage,
-                             String posCompletionTrackingUrl) {
-        this.exitStatus = exitStatus;
-        this.exitMessage = exitMessage;
-        this.postCompletionTrackingUrl = posCompletionTrackingUrl;
-      }
-    }
-    // upcall to app must be outside locks
-    public void taskAllocated(Object task,
-                               Object appCookie,
-                               Container container);
-    // this may end up being called for a task+container pair that the app
-    // has not heard about. this can happen because of a race between
-    // taskAllocated() upcall and deallocateTask() downcall
-    public void containerCompleted(Object taskLastAllocated,
-                                    ContainerStatus containerStatus);
-    public void containerBeingReleased(ContainerId containerId);
-    public void nodesUpdated(List<NodeReport> updatedNodes);
-    public void appShutdownRequested();
-    public void setApplicationRegistrationData(
-                                Resource maxContainerCapability,
-                                Map<ApplicationAccessType, String> appAcls,
-                                ByteBuffer clientAMSecretKey
-                                );
-    public void onError(Throwable t);
-    public float getProgress();
-    public void preemptContainer(ContainerId containerId);
-    public AppFinalStatus getFinalAppStatus();
-
-  }
-}