You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by sr...@apache.org on 2016/01/20 18:05:41 UTC

[15/50] [abbrv] tez git commit: TEZ-3024. Move TaskCommunicator to correct package. (sseth)

TEZ-3024. Move TaskCommunicator to correct package. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fad16425
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fad16425
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fad16425

Branch: refs/heads/TEZ-2980
Commit: fad16425b7979970f29884343316e1a10f4d0872
Parents: 1d76543
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jan 12 17:32:28 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jan 12 17:32:28 2016 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/tez/dag/api/TaskCommunicator.java    | 221 -----------------
 .../tez/dag/api/TaskCommunicatorContext.java    | 225 -----------------
 .../tez/dag/api/TaskHeartbeatRequest.java       |  68 ------
 .../tez/dag/api/TaskHeartbeatResponse.java      |  51 ----
 .../dag/app/TaskCommunicatorContextImpl.java    |   7 +-
 .../tez/dag/app/TaskCommunicatorManager.java    |   8 +-
 .../tez/dag/app/TaskCommunicatorWrapper.java    |   2 +-
 .../dag/app/TezLocalTaskCommunicatorImpl.java   |   2 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   8 +-
 .../serviceplugins/api/TaskCommunicator.java    | 232 ++++++++++++++++++
 .../api/TaskCommunicatorContext.java            | 240 +++++++++++++++++++
 .../api/TaskHeartbeatRequest.java               |  82 +++++++
 .../api/TaskHeartbeatResponse.java              |  65 +++++
 .../app/TestTaskCommunicatorContextImpl.java    |   2 +-
 .../dag/app/TestTaskCommunicatorManager.java    |   4 +-
 .../dag/app/TestTaskCommunicatorManager1.java   |   8 +-
 .../dag/app/TestTaskCommunicatorWrapper.java    |   2 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   2 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    |   4 -
 .../dag/app/rm/container/TestAMContainer.java   |   2 +-
 .../app/rm/container/TestAMContainerMap.java    |   2 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |   2 +-
 ...ezTestServiceTaskCommunicatorWithErrors.java |   4 +-
 24 files changed, 647 insertions(+), 597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d39cbb8..0645591 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 Release 0.8.2: Unreleased
 
 INCOMPATIBLE CHANGES
+  TEZ-3024. Move TaskCommunicator to correct package.
   TEZ-2679. Admin forms of launch env settings
   TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators.
   TEZ-2949. Allow duplicate dag names within session for Tez.

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
deleted file mode 100644
index 1b6ad07..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import javax.annotation.Nullable;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.common.ServicePluginLifecycle;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.ServicePluginException;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
-
-/**
- * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM.
- * This is used to communicate with running services, potentially launching tasks, and getting
- * updates from running tasks.
- * <p/>
- * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides
- * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
- *
- * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking
- * of this heartbeat mechanism, handling lost or duplicate responses.
- *
- */
-public abstract class TaskCommunicator implements ServicePluginLifecycle {
-
-  // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
-  // - registerContainerEnd should provide the end reason / possible rename
-  // - get rid of getAddress
-  // - Add methods to support task preemption
-  // - Add a dagStarted notification, along with a payload
-  // - taskSpec breakup into a clean interface
-  // - Add methods to report task / container completion
-
-  private final TaskCommunicatorContext taskCommunicatorContext;
-
-  public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-    this.taskCommunicatorContext = taskCommunicatorContext;
-  }
-
-  /**
-   * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which
-   * is
-   * used to communicate with the rest of the system
-   *
-   * @return an instance of {@link TaskCommunicatorContext}
-   */
-  public TaskCommunicatorContext getContext() {
-    return taskCommunicatorContext;
-  }
-
-  /**
-   * An entry point for initialization.
-   * Order of service setup. Constructor, initialize(), start() - when starting a service.
-   *
-   * @throws Exception
-   */
-  @Override
-  public void initialize() throws Exception {
-  }
-
-  /**
-   * An entry point for starting the service.
-   * Order of service setup. Constructor, initialize(), start() - when starting a service.
-   *
-   * @throws Exception
-   */
-  @Override
-  public void start() throws Exception {
-  }
-
-  /**
-   * Stop the service. This could be invoked at any point, when the service is no longer required -
-   * including in case of errors.
-   *
-   * @throws Exception
-   */
-  @Override
-  public void shutdown() throws Exception {
-  }
-
-
-  /**
-   * Register a new container.
-   *
-   * @param containerId the associated containerId
-   * @param hostname    the hostname on which the container runs
-   * @param port        the port for the service which is running the container
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void registerRunningContainer(ContainerId containerId, String hostname,
-                                                int port) throws ServicePluginException;
-
-  /**
-   * Register the end of a container. This can be caused by preemption, the container completing
-   * successfully, etc.
-   *
-   * @param containerId the associated containerId
-   * @param endReason   the end reason for the container completing
-   * @param diagnostics diagnostics associated with the container end
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
-                                            @Nullable String diagnostics) throws
-      ServicePluginException;
-
-  /**
-   * Register a task attempt to execute on a container
-   *
-   * @param containerId         the containerId on which this task needs to run
-   * @param taskSpec            the task specifications for the task to be executed
-   * @param additionalResources additional local resources which may be required to run this task
-   *                            on
-   *                            the container
-   * @param credentials         the credentials required to run this task
-   * @param credentialsChanged  whether the credentials are different from the original credentials
-   *                            associated with this container
-   * @param priority            the priority of the task being executed
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
-                                                  Map<String, LocalResource> additionalResources,
-                                                  Credentials credentials,
-                                                  boolean credentialsChanged, int priority) throws
-      ServicePluginException;
-
-  /**
-   * Register the completion of a task. This may be a result of preemption, the container dying,
-   * the node dying, the task completing to success
-   *
-   * @param taskAttemptID the task attempt which has completed / needs to be completed
-   * @param endReason     the endReason for the task attempt.
-   * @param diagnostics   diagnostics associated with the task end
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
-                                                    TaskAttemptEndReason endReason,
-                                                    @Nullable String diagnostics) throws
-      ServicePluginException;
-
-  /**
-   * Return the address, if any, that the service listens on
-   *
-   * @return the address
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract InetSocketAddress getAddress() throws ServicePluginException;
-
-  /**
-   * Receive notifications on vertex state changes.
-   * <p/>
-   * State changes will be received based on the registration via {@link
-   * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
-   * java.util.Set)}. Notifications will be received for all registered state changes, and not just
-   * for the latest state update. They will be in order in which the state change occurred. </p>
-   * <p/>
-   * Extensive processing should not be performed via this method call. Instead this should just be
-   * used as a notification mechanism.
-   * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
-   * and
-   * multi-threading/concurrency implications must be considered.
-   *
-   * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
-   *                    Additional information may be available for specific events, Look at the
-   *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
-
-  /**
-   * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
-   * query information about the current dag during the duration of the dagComplete invocation.
-   * <p/>
-   * After this, the contents returned from querying the context may change at any point - due to
-   * the next dag being submitted.
-   *
-   * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
-
-  /**
-   * Share meta-information such as host:port information where the Task Communicator may be
-   * listening.
-   * Primarily for use by compatible launchers to learn this information.
-   *
-   * @return meta info for the task communicator
-   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
-   *                               This will cause the app to shutdown.
-   */
-  public abstract Object getMetaInfo() throws ServicePluginException;
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
deleted file mode 100644
index 7c5a648..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-// Do not make calls into this from within a held lock.
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public interface TaskCommunicatorContext {
-
-  // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
-  // - Consolidate usage of IDs
-  // - Split the heartbeat API to a liveness check and a status update
-  // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest
-  // - Fix taskStarted needs to be invoked before launching the actual task.
-  // - Potentially add methods to report availability stats to the scheduler
-  // - Report taskSuccess via a method instead of the heartbeat
-  // - Add methods to signal container / task state changes
-  // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
-  // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
-
-  /**
-   * Get the UserPayload that was configured while setting up the task communicator
-   *
-   * @return the initially configured user payload
-   */
-  UserPayload getInitialUserPayload();
-
-  /**
-   * Get the application attempt id for the running application. Relevant when running under YARN
-   *
-   * @return the applicationAttemptId for the running app
-   */
-  ApplicationAttemptId getApplicationAttemptId();
-
-  /**
-   * Get credentials associated with the AppMaster
-   *
-   * @return credentials
-   */
-  Credentials getCredentials();
-
-  /**
-   * Check whether a running attempt can commit. This provides a leader election mechanism amongst
-   * multiple running attempts
-   *
-   * @param taskAttemptId the associated task attempt id
-   * @return whether the attempt can commit or not
-   * @throws IOException
-   */
-  boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
-
-  /**
-   * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as
-   * receive new information which may need to be propagated to the task. This includes events
-   * generated by the task and events which need to be sent to the task
-   * This method must be invoked periodically to receive updates for a running task
-   *
-   * @param request the update from the running task.
-   * @return the response that is requried by the task.
-   * @throws IOException
-   * @throws TezException
-   */
-  TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
-
-  /**
-   * Check whether the container is known by the framework. The state of this container is
-   * irrelevant
-   *
-   * @param containerId the relevant container id
-   * @return true if the container is known, false if it isn't
-   */
-  boolean isKnownContainer(ContainerId containerId);
-
-  /**
-   * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the
-   * task attempt timing out.
-   * Invocations to heartbeat provides the same keep-alive functionality
-   *
-   * @param taskAttemptId the relevant task attempt
-   */
-  void taskAlive(TezTaskAttemptID taskAttemptId);
-
-  /**
-   * Inform the framework that a container is alive. This need to be invoked periodically to avoid
-   * the container attempt timing out.
-   * Invocations to heartbeat provides the same keep-alive functionality
-   *
-   * @param containerId the relevant container id
-   */
-  void containerAlive(ContainerId containerId);
-
-  /**
-   * Inform the framework that the task has started execution
-   *
-   * @param taskAttemptId the relevant task attempt id
-   * @param containerId   the containerId in which the task attempt is running
-   */
-  void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
-
-  /**
-   * Inform the framework that a task has been killed
-   *
-   * @param taskAttemptId        the relevant task attempt id
-   * @param taskAttemptEndReason the reason for the task attempt being killed
-   * @param diagnostics          any diagnostics messages which are relevant to the task attempt
-   *                             kill
-   */
-  void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
-                  @Nullable String diagnostics);
-
-  /**
-   * Inform the framework that a task has failed
-   *
-   * @param taskAttemptId        the relevant task attempt id
-   * @param taskAttemptEndReason the reason for the task failure
-   * @param diagnostics          any diagnostics messages which are relevant to the task attempt
-   *                             failure
-   */
-  void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
-                  @Nullable String diagnostics);
-
-  /**
-   * Register to get notifications on updates to the specified vertex. Notifications will be sent
-   * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
-   * </p>
-   * <p/>
-   * This method can only be invoked once. Duplicate invocations will result in an error.
-   *
-   * @param vertexName the vertex name for which notifications are required.
-   * @param stateSet   the set of states for which notifications are required. null implies all
-   */
-  void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
-
-  /**
-   * Get the name of the currently executing dag
-   *
-   * @return the name of the currently executing dag
-   */
-  String getCurrentDagName();
-
-  /**
-   * Get an identifier for the executing context of the DAG.
-   * @return a String identifier for the exeucting context.
-   */
-  String getCurrentAppIdentifier();
-
-  /**
-   * Get the identifier for the currently executing dag.
-   * @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
-   */
-  int getCurrentDagIdenitifer();
-
-  /**
-   * Get the name of the Input vertices for the specified vertex.
-   * Root Inputs are not returned.
-   *
-   * @param vertexName the vertex for which source vertex names will be returned
-   * @return an Iterable containing the list of input vertices for the specified vertex
-   */
-  Iterable<String> getInputVertexNames(String vertexName);
-
-  /**
-   * Get the total number of tasks in the given vertex
-   *
-   * @param vertexName the relevant vertex name
-   * @return total number of tasks in this vertex
-   */
-  int getVertexTotalTaskCount(String vertexName);
-
-  /**
-   * Get the number of completed tasks for a given vertex
-   *
-   * @param vertexName the vertex name
-   * @return the number of completed tasks for the vertex
-   */
-  int getVertexCompletedTaskCount(String vertexName);
-
-  /**
-   * Get the number of running tasks for a given vertex
-   *
-   * @param vertexName the vertex name
-   * @return the number of running tasks for the vertex
-   */
-  int getVertexRunningTaskCount(String vertexName);
-
-  /**
-   * Get the start time for the first attempt of the specified task
-   *
-   * @param vertexName the vertex to which the task belongs
-   * @param taskIndex  the index of the task
-   * @return the start time for the first attempt of the task
-   */
-  long getFirstAttemptStartTime(String vertexName, int taskIndex);
-
-  /**
-   * Get the start time for the currently executing DAG
-   *
-   * @return time when the current dag started executing
-   */
-  long getDagStartTime();
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
deleted file mode 100644
index d0c22d3..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.util.List;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public class TaskHeartbeatRequest {
-
-  // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request.
-  private final String containerIdentifier;
-  private final TezTaskAttemptID taskAttemptId;
-  private final List<TezEvent> events;
-  private final int startIndex;
-  private final int preRoutedStartIndex;
-  private final int maxEvents;
-
-
-  public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
-                              int preRoutedStartIndex,
-                              int maxEvents) {
-    this.containerIdentifier = containerIdentifier;
-    this.taskAttemptId = taskAttemptId;
-    this.events = events;
-    this.startIndex = startIndex;
-    this.preRoutedStartIndex = preRoutedStartIndex;
-    this.maxEvents = maxEvents;
-  }
-
-  public String getContainerIdentifier() {
-    return containerIdentifier;
-  }
-
-  public TezTaskAttemptID getTaskAttemptId() {
-    return taskAttemptId;
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public int getStartIndex() {
-    return startIndex;
-  }
-
-  public int getPreRoutedStartIndex() {
-    return preRoutedStartIndex;
-  }
-
-  public int getMaxEvents() {
-    return maxEvents;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
deleted file mode 100644
index dcf89ff..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.util.List;
-
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public class TaskHeartbeatResponse {
-
-  private final boolean shouldDie;
-  private final int nextFromEventId;
-  private final int nextPreRoutedEventId;
-  private final List<TezEvent> events;
-
-  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
-    this.shouldDie = shouldDie;
-    this.events = events;
-    this.nextFromEventId = nextFromEventId;
-    this.nextPreRoutedEventId = nextPreRoutedEventId;
-  }
-
-  public boolean isShouldDie() {
-    return shouldDie;
-  }
-
-  public List<TezEvent> getEvents() {
-    return events;
-  }
-
-  public int getNextFromEventId() {
-    return nextFromEventId;
-  }
-
-  public int getNextPreRoutedEventId() {
-    return nextPreRoutedEventId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 2b7234c..7f88be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -29,11 +29,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.app.dag.DAG;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 64a964b..a196114 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,7 +33,7 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -53,14 +53,14 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
index 4f9780e..4a75875 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -21,7 +21,7 @@ import java.util.Map;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
index 47688d1..15d90d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
@@ -18,7 +18,7 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index d071e0d..0bbe97a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -47,10 +47,10 @@ import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
new file mode 100644
index 0000000..8f919d1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.ServicePluginLifecycle;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
+
+/**
+ * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM.
+ * This is used to communicate with running services, potentially launching tasks, and getting
+ * updates from running tasks.
+ * <p/>
+ * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides
+ * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
+ *
+ * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking
+ * of this heartbeat mechanism, handling lost or duplicate responses.
+ *
+ */
+public abstract class TaskCommunicator implements ServicePluginLifecycle {
+
+  // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
+  // - registerContainerEnd should provide the end reason / possible rename
+  // - get rid of getAddress
+  // - Add methods to support task preemption
+  // - Add a dagStarted notification, along with a payload
+  // - taskSpec breakup into a clean interface
+  // - Add methods to report task / container completion
+
+  private final TaskCommunicatorContext taskCommunicatorContext;
+
+  public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+    this.taskCommunicatorContext = taskCommunicatorContext;
+  }
+
+  /**
+   * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which
+   * is
+   * used to communicate with the rest of the system
+   *
+   * @return an instance of {@link TaskCommunicatorContext}
+   */
+  public TaskCommunicatorContext getContext() {
+    return taskCommunicatorContext;
+  }
+
+  /**
+   * An entry point for initialization.
+   * Order of service setup. Constructor, initialize(), start() - when starting a service.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void initialize() throws Exception {
+  }
+
+  /**
+   * An entry point for starting the service.
+   * Order of service setup. Constructor, initialize(), start() - when starting a service.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void start() throws Exception {
+  }
+
+  /**
+   * Stop the service. This could be invoked at any point, when the service is no longer required -
+   * including in case of errors.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void shutdown() throws Exception {
+  }
+
+
+  /**
+   * Register a new container.
+   *
+   * @param containerId the associated containerId
+   * @param hostname    the hostname on which the container runs
+   * @param port        the port for the service which is running the container
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void registerRunningContainer(ContainerId containerId, String hostname,
+                                                int port) throws ServicePluginException;
+
+  /**
+   * Register the end of a container. This can be caused by preemption, the container completing
+   * successfully, etc.
+   *
+   * @param containerId the associated containerId
+   * @param endReason   the end reason for the container completing
+   * @param diagnostics diagnostics associated with the container end
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+                                            @Nullable String diagnostics) throws
+      ServicePluginException;
+
+  /**
+   * Register a task attempt to execute on a container
+   *
+   * @param containerId         the containerId on which this task needs to run
+   * @param taskSpec            the task specifications for the task to be executed
+   * @param additionalResources additional local resources which may be required to run this task
+   *                            on
+   *                            the container
+   * @param credentials         the credentials required to run this task
+   * @param credentialsChanged  whether the credentials are different from the original credentials
+   *                            associated with this container
+   * @param priority            the priority of the task being executed
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+                                                  Map<String, LocalResource> additionalResources,
+                                                  Credentials credentials,
+                                                  boolean credentialsChanged, int priority) throws
+      ServicePluginException;
+
+  /**
+   * Register the completion of a task. This may be a result of preemption, the container dying,
+   * the node dying, the task completing to success
+   *
+   * @param taskAttemptID the task attempt which has completed / needs to be completed
+   * @param endReason     the endReason for the task attempt.
+   * @param diagnostics   diagnostics associated with the task end
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+                                                    TaskAttemptEndReason endReason,
+                                                    @Nullable String diagnostics) throws
+      ServicePluginException;
+
+  /**
+   * Return the address, if any, that the service listens on
+   *
+   * @return the address
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract InetSocketAddress getAddress() throws ServicePluginException;
+
+  /**
+   * Receive notifications on vertex state changes.
+   * <p/>
+   * State changes will be received based on the registration via {@link
+   * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
+   * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+   * for the latest state update. They will be in order in which the state change occurred. </p>
+   * <p/>
+   * Extensive processing should not be performed via this method call. Instead this should just be
+   * used as a notification mechanism.
+   * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
+   * and
+   * multi-threading/concurrency implications must be considered.
+   *
+   * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+   *                    Additional information may be available for specific events, Look at the
+   *                    type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
+
+  /**
+   * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
+   * query information about the current dag during the duration of the dagComplete invocation.
+   * <p/>
+   * After this, the contents returned from querying the context may change at any point - due to
+   * the next dag being submitted.
+   *
+   * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
+
+  /**
+   * Share meta-information such as host:port information where the Task Communicator may be
+   * listening.
+   * Primarily for use by compatible launchers to learn this information.
+   *
+   * @return meta info for the task communicator
+   * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+   *                               This will cause the app to shutdown.
+   */
+  public abstract Object getMetaInfo() throws ServicePluginException;
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
new file mode 100644
index 0000000..c55bdbd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+// Do not make calls into this from within a held lock.
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public interface TaskCommunicatorContext {
+
+  // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
+  // - Consolidate usage of IDs
+  // - Split the heartbeat API to a liveness check and a status update
+  // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest
+  // - Fix taskStarted needs to be invoked before launching the actual task.
+  // - Potentially add methods to report availability stats to the scheduler
+  // - Report taskSuccess via a method instead of the heartbeat
+  // - Add methods to signal container / task state changes
+  // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
+  // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
+
+  /**
+   * Get the UserPayload that was configured while setting up the task communicator
+   *
+   * @return the initially configured user payload
+   */
+  UserPayload getInitialUserPayload();
+
+  /**
+   * Get the application attempt id for the running application. Relevant when running under YARN
+   *
+   * @return the applicationAttemptId for the running app
+   */
+  ApplicationAttemptId getApplicationAttemptId();
+
+  /**
+   * Get credentials associated with the AppMaster
+   *
+   * @return credentials
+   */
+  Credentials getCredentials();
+
+  /**
+   * Check whether a running attempt can commit. This provides a leader election mechanism amongst
+   * multiple running attempts
+   *
+   * @param taskAttemptId the associated task attempt id
+   * @return whether the attempt can commit or not
+   * @throws IOException
+   */
+  boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+
+  /**
+   * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as
+   * receive new information which may need to be propagated to the task. This includes events
+   * generated by the task and events which need to be sent to the task
+   * This method must be invoked periodically to receive updates for a running task
+   *
+   * @param request the update from the running task.
+   * @return the response that is requried by the task.
+   * @throws IOException
+   * @throws TezException
+   */
+  TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
+
+  /**
+   * Check whether the container is known by the framework. The state of this container is
+   * irrelevant
+   *
+   * @param containerId the relevant container id
+   * @return true if the container is known, false if it isn't
+   */
+  boolean isKnownContainer(ContainerId containerId);
+
+  /**
+   * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the
+   * task attempt timing out.
+   * Invocations to heartbeat provides the same keep-alive functionality
+   *
+   * @param taskAttemptId the relevant task attempt
+   */
+  void taskAlive(TezTaskAttemptID taskAttemptId);
+
+  /**
+   * Inform the framework that a container is alive. This need to be invoked periodically to avoid
+   * the container attempt timing out.
+   * Invocations to heartbeat provides the same keep-alive functionality
+   *
+   * @param containerId the relevant container id
+   */
+  void containerAlive(ContainerId containerId);
+
+  /**
+   * Inform the framework that the task has started execution
+   *
+   * @param taskAttemptId the relevant task attempt id
+   * @param containerId   the containerId in which the task attempt is running
+   */
+  void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
+
+  /**
+   * Inform the framework that a task has been killed
+   *
+   * @param taskAttemptId        the relevant task attempt id
+   * @param taskAttemptEndReason the reason for the task attempt being killed
+   * @param diagnostics          any diagnostics messages which are relevant to the task attempt
+   *                             kill
+   */
+  void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                  @Nullable String diagnostics);
+
+  /**
+   * Inform the framework that a task has failed
+   *
+   * @param taskAttemptId        the relevant task attempt id
+   * @param taskAttemptEndReason the reason for the task failure
+   * @param diagnostics          any diagnostics messages which are relevant to the task attempt
+   *                             failure
+   */
+  void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+                  @Nullable String diagnostics);
+
+  /**
+   * Register to get notifications on updates to the specified vertex. Notifications will be sent
+   * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+   * </p>
+   * <p/>
+   * This method can only be invoked once. Duplicate invocations will result in an error.
+   *
+   * @param vertexName the vertex name for which notifications are required.
+   * @param stateSet   the set of states for which notifications are required. null implies all
+   */
+  void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+
+  /**
+   * Get the name of the currently executing dag
+   *
+   * @return the name of the currently executing dag
+   */
+  String getCurrentDagName();
+
+  /**
+   * Get an identifier for the executing context of the DAG.
+   * @return a String identifier for the exeucting context.
+   */
+  String getCurrentAppIdentifier();
+
+  /**
+   * Get the identifier for the currently executing dag.
+   * @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
+   */
+  int getCurrentDagIdenitifer();
+
+  /**
+   * Get the name of the Input vertices for the specified vertex.
+   * Root Inputs are not returned.
+   *
+   * @param vertexName the vertex for which source vertex names will be returned
+   * @return an Iterable containing the list of input vertices for the specified vertex
+   */
+  Iterable<String> getInputVertexNames(String vertexName);
+
+  /**
+   * Get the total number of tasks in the given vertex
+   *
+   * @param vertexName the relevant vertex name
+   * @return total number of tasks in this vertex
+   */
+  int getVertexTotalTaskCount(String vertexName);
+
+  /**
+   * Get the number of completed tasks for a given vertex
+   *
+   * @param vertexName the vertex name
+   * @return the number of completed tasks for the vertex
+   */
+  int getVertexCompletedTaskCount(String vertexName);
+
+  /**
+   * Get the number of running tasks for a given vertex
+   *
+   * @param vertexName the vertex name
+   * @return the number of running tasks for the vertex
+   */
+  int getVertexRunningTaskCount(String vertexName);
+
+  /**
+   * Get the start time for the first attempt of the specified task
+   *
+   * @param vertexName the vertex to which the task belongs
+   * @param taskIndex  the index of the task
+   * @return the start time for the first attempt of the task
+   */
+  long getFirstAttemptStartTime(String vertexName, int taskIndex);
+
+  /**
+   * Get the start time for the currently executing DAG
+   *
+   * @return time when the current dag started executing
+   */
+  long getDagStartTime();
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
new file mode 100644
index 0000000..40b006f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public class TaskHeartbeatRequest {
+
+  // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request.
+  private final String containerIdentifier;
+  private final TezTaskAttemptID taskAttemptId;
+  private final List<TezEvent> events;
+  private final int startIndex;
+  private final int preRoutedStartIndex;
+  private final int maxEvents;
+
+
+  public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
+                              int preRoutedStartIndex,
+                              int maxEvents) {
+    this.containerIdentifier = containerIdentifier;
+    this.taskAttemptId = taskAttemptId;
+    this.events = events;
+    this.startIndex = startIndex;
+    this.preRoutedStartIndex = preRoutedStartIndex;
+    this.maxEvents = maxEvents;
+  }
+
+  public String getContainerIdentifier() {
+    return containerIdentifier;
+  }
+
+  public TezTaskAttemptID getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public int getStartIndex() {
+    return startIndex;
+  }
+
+  public int getPreRoutedStartIndex() {
+    return preRoutedStartIndex;
+  }
+
+  public int getMaxEvents() {
+    return maxEvents;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
new file mode 100644
index 0000000..9145004
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.util.List;
+
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public class TaskHeartbeatResponse {
+
+  private final boolean shouldDie;
+  private final int nextFromEventId;
+  private final int nextPreRoutedEventId;
+  private final List<TezEvent> events;
+
+  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
+    this.shouldDie = shouldDie;
+    this.events = events;
+    this.nextFromEventId = nextFromEventId;
+    this.nextPreRoutedEventId = nextPreRoutedEventId;
+  }
+
+  public boolean isShouldDie() {
+    return shouldDie;
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public int getNextFromEventId() {
+    return nextFromEventId;
+  }
+
+  public int getNextPreRoutedEventId() {
+    return nextPreRoutedEventId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 5222a2d..869bfd5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.verify;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index d76a5b3..5323928 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -49,8 +49,8 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.UserPayload;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 2921a22..0f8afaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -54,19 +54,19 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
index 212bca4..e89cc99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
@@ -29,7 +29,7 @@
 package org.apache.tez.dag.app;
 
 import com.google.common.collect.Sets;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.junit.Test;
 
 public class TestTaskCommunicatorWrapper {

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 4772492..3bb688e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index c649870..c62ff21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
 import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -71,10 +70,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.dag.app.TaskCommunicatorManager;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 8b8b6d7..ed14871 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,7 +67,7 @@ import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index e21dda1..2fcd0c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 127967a..f199dcf 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
 import org.apache.tez.dag.app.TezTestServiceCommunicator;
 import org.apache.tez.dag.records.TezTaskAttemptID;

http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
index 0a3d8d4..90313d4 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;