You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/01/13 02:33:19 UTC
tez git commit: TEZ-3024. Move TaskCommunicator to correct package.
(sseth)
Repository: tez
Updated Branches:
refs/heads/master 1d7654316 -> fad16425b
TEZ-3024. Move TaskCommunicator to correct package. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fad16425
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fad16425
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fad16425
Branch: refs/heads/master
Commit: fad16425b7979970f29884343316e1a10f4d0872
Parents: 1d76543
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jan 12 17:32:28 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jan 12 17:32:28 2016 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TaskCommunicator.java | 221 -----------------
.../tez/dag/api/TaskCommunicatorContext.java | 225 -----------------
.../tez/dag/api/TaskHeartbeatRequest.java | 68 ------
.../tez/dag/api/TaskHeartbeatResponse.java | 51 ----
.../dag/app/TaskCommunicatorContextImpl.java | 7 +-
.../tez/dag/app/TaskCommunicatorManager.java | 8 +-
.../tez/dag/app/TaskCommunicatorWrapper.java | 2 +-
.../dag/app/TezLocalTaskCommunicatorImpl.java | 2 +-
.../tez/dag/app/TezTaskCommunicatorImpl.java | 8 +-
.../serviceplugins/api/TaskCommunicator.java | 232 ++++++++++++++++++
.../api/TaskCommunicatorContext.java | 240 +++++++++++++++++++
.../api/TaskHeartbeatRequest.java | 82 +++++++
.../api/TaskHeartbeatResponse.java | 65 +++++
.../app/TestTaskCommunicatorContextImpl.java | 2 +-
.../dag/app/TestTaskCommunicatorManager.java | 4 +-
.../dag/app/TestTaskCommunicatorManager1.java | 8 +-
.../dag/app/TestTaskCommunicatorWrapper.java | 2 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 2 +-
.../dag/app/rm/TestTaskSchedulerManager.java | 4 -
.../dag/app/rm/container/TestAMContainer.java | 2 +-
.../app/rm/container/TestAMContainerMap.java | 2 +-
.../TezTestServiceTaskCommunicatorImpl.java | 2 +-
...ezTestServiceTaskCommunicatorWithErrors.java | 4 +-
24 files changed, 647 insertions(+), 597 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d39cbb8..0645591 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
Release 0.8.2: Unreleased
INCOMPATIBLE CHANGES
+ TEZ-3024. Move TaskCommunicator to correct package.
TEZ-2679. Admin forms of launch env settings
TEZ-2948. Stop using dagName in the dagComplete notification to TaskCommunicators.
TEZ-2949. Allow duplicate dag names within session for Tez.
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
deleted file mode 100644
index 1b6ad07..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import javax.annotation.Nullable;
-import java.net.InetSocketAddress;
-import java.util.Map;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.common.ServicePluginLifecycle;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.ServicePluginException;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
-
-/**
- * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM.
- * This is used to communicate with running services, potentially launching tasks, and getting
- * updates from running tasks.
- * <p/>
- * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides
- * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
- *
- * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking
- * of this heartbeat mechanism, handling lost or duplicate responses.
- *
- */
-public abstract class TaskCommunicator implements ServicePluginLifecycle {
-
- // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
- // - registerContainerEnd should provide the end reason / possible rename
- // - get rid of getAddress
- // - Add methods to support task preemption
- // - Add a dagStarted notification, along with a payload
- // - taskSpec breakup into a clean interface
- // - Add methods to report task / container completion
-
- private final TaskCommunicatorContext taskCommunicatorContext;
-
- public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
- this.taskCommunicatorContext = taskCommunicatorContext;
- }
-
- /**
- * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which
- * is
- * used to communicate with the rest of the system
- *
- * @return an instance of {@link TaskCommunicatorContext}
- */
- public TaskCommunicatorContext getContext() {
- return taskCommunicatorContext;
- }
-
- /**
- * An entry point for initialization.
- * Order of service setup. Constructor, initialize(), start() - when starting a service.
- *
- * @throws Exception
- */
- @Override
- public void initialize() throws Exception {
- }
-
- /**
- * An entry point for starting the service.
- * Order of service setup. Constructor, initialize(), start() - when starting a service.
- *
- * @throws Exception
- */
- @Override
- public void start() throws Exception {
- }
-
- /**
- * Stop the service. This could be invoked at any point, when the service is no longer required -
- * including in case of errors.
- *
- * @throws Exception
- */
- @Override
- public void shutdown() throws Exception {
- }
-
-
- /**
- * Register a new container.
- *
- * @param containerId the associated containerId
- * @param hostname the hostname on which the container runs
- * @param port the port for the service which is running the container
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void registerRunningContainer(ContainerId containerId, String hostname,
- int port) throws ServicePluginException;
-
- /**
- * Register the end of a container. This can be caused by preemption, the container completing
- * successfully, etc.
- *
- * @param containerId the associated containerId
- * @param endReason the end reason for the container completing
- * @param diagnostics diagnostics associated with the container end
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
- @Nullable String diagnostics) throws
- ServicePluginException;
-
- /**
- * Register a task attempt to execute on a container
- *
- * @param containerId the containerId on which this task needs to run
- * @param taskSpec the task specifications for the task to be executed
- * @param additionalResources additional local resources which may be required to run this task
- * on
- * the container
- * @param credentials the credentials required to run this task
- * @param credentialsChanged whether the credentials are different from the original credentials
- * associated with this container
- * @param priority the priority of the task being executed
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
- Map<String, LocalResource> additionalResources,
- Credentials credentials,
- boolean credentialsChanged, int priority) throws
- ServicePluginException;
-
- /**
- * Register the completion of a task. This may be a result of preemption, the container dying,
- * the node dying, the task completing to success
- *
- * @param taskAttemptID the task attempt which has completed / needs to be completed
- * @param endReason the endReason for the task attempt.
- * @param diagnostics diagnostics associated with the task end
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
- TaskAttemptEndReason endReason,
- @Nullable String diagnostics) throws
- ServicePluginException;
-
- /**
- * Return the address, if any, that the service listens on
- *
- * @return the address
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract InetSocketAddress getAddress() throws ServicePluginException;
-
- /**
- * Receive notifications on vertex state changes.
- * <p/>
- * State changes will be received based on the registration via {@link
- * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
- * java.util.Set)}. Notifications will be received for all registered state changes, and not just
- * for the latest state update. They will be in order in which the state change occurred. </p>
- * <p/>
- * Extensive processing should not be performed via this method call. Instead this should just be
- * used as a notification mechanism.
- * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
- * and
- * multi-threading/concurrency implications must be considered.
- *
- * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
- * Additional information may be available for specific events, Look at the
- * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
-
- /**
- * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
- * query information about the current dag during the duration of the dagComplete invocation.
- * <p/>
- * After this, the contents returned from querying the context may change at any point - due to
- * the next dag being submitted.
- *
- * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
-
- /**
- * Share meta-information such as host:port information where the Task Communicator may be
- * listening.
- * Primarily for use by compatible launchers to learn this information.
- *
- * @return meta info for the task communicator
- * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
- * This will cause the app to shutdown.
- */
- public abstract Object getMetaInfo() throws ServicePluginException;
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
deleted file mode 100644
index 7c5a648..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.dag.api.event.VertexState;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
-// Do not make calls into this from within a held lock.
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public interface TaskCommunicatorContext {
-
- // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
- // - Consolidate usage of IDs
- // - Split the heartbeat API to a liveness check and a status update
- // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest
- // - Fix taskStarted needs to be invoked before launching the actual task.
- // - Potentially add methods to report availability stats to the scheduler
- // - Report taskSuccess via a method instead of the heartbeat
- // - Add methods to signal container / task state changes
- // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
- // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
-
- /**
- * Get the UserPayload that was configured while setting up the task communicator
- *
- * @return the initially configured user payload
- */
- UserPayload getInitialUserPayload();
-
- /**
- * Get the application attempt id for the running application. Relevant when running under YARN
- *
- * @return the applicationAttemptId for the running app
- */
- ApplicationAttemptId getApplicationAttemptId();
-
- /**
- * Get credentials associated with the AppMaster
- *
- * @return credentials
- */
- Credentials getCredentials();
-
- /**
- * Check whether a running attempt can commit. This provides a leader election mechanism amongst
- * multiple running attempts
- *
- * @param taskAttemptId the associated task attempt id
- * @return whether the attempt can commit or not
- * @throws IOException
- */
- boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
-
- /**
- * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as
- * receive new information which may need to be propagated to the task. This includes events
- * generated by the task and events which need to be sent to the task
- * This method must be invoked periodically to receive updates for a running task
- *
- * @param request the update from the running task.
- * @return the response that is requried by the task.
- * @throws IOException
- * @throws TezException
- */
- TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
-
- /**
- * Check whether the container is known by the framework. The state of this container is
- * irrelevant
- *
- * @param containerId the relevant container id
- * @return true if the container is known, false if it isn't
- */
- boolean isKnownContainer(ContainerId containerId);
-
- /**
- * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the
- * task attempt timing out.
- * Invocations to heartbeat provides the same keep-alive functionality
- *
- * @param taskAttemptId the relevant task attempt
- */
- void taskAlive(TezTaskAttemptID taskAttemptId);
-
- /**
- * Inform the framework that a container is alive. This need to be invoked periodically to avoid
- * the container attempt timing out.
- * Invocations to heartbeat provides the same keep-alive functionality
- *
- * @param containerId the relevant container id
- */
- void containerAlive(ContainerId containerId);
-
- /**
- * Inform the framework that the task has started execution
- *
- * @param taskAttemptId the relevant task attempt id
- * @param containerId the containerId in which the task attempt is running
- */
- void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
-
- /**
- * Inform the framework that a task has been killed
- *
- * @param taskAttemptId the relevant task attempt id
- * @param taskAttemptEndReason the reason for the task attempt being killed
- * @param diagnostics any diagnostics messages which are relevant to the task attempt
- * kill
- */
- void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
- @Nullable String diagnostics);
-
- /**
- * Inform the framework that a task has failed
- *
- * @param taskAttemptId the relevant task attempt id
- * @param taskAttemptEndReason the reason for the task failure
- * @param diagnostics any diagnostics messages which are relevant to the task attempt
- * failure
- */
- void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
- @Nullable String diagnostics);
-
- /**
- * Register to get notifications on updates to the specified vertex. Notifications will be sent
- * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
- * </p>
- * <p/>
- * This method can only be invoked once. Duplicate invocations will result in an error.
- *
- * @param vertexName the vertex name for which notifications are required.
- * @param stateSet the set of states for which notifications are required. null implies all
- */
- void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
-
- /**
- * Get the name of the currently executing dag
- *
- * @return the name of the currently executing dag
- */
- String getCurrentDagName();
-
- /**
- * Get an identifier for the executing context of the DAG.
- * @return a String identifier for the exeucting context.
- */
- String getCurrentAppIdentifier();
-
- /**
- * Get the identifier for the currently executing dag.
- * @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
- */
- int getCurrentDagIdenitifer();
-
- /**
- * Get the name of the Input vertices for the specified vertex.
- * Root Inputs are not returned.
- *
- * @param vertexName the vertex for which source vertex names will be returned
- * @return an Iterable containing the list of input vertices for the specified vertex
- */
- Iterable<String> getInputVertexNames(String vertexName);
-
- /**
- * Get the total number of tasks in the given vertex
- *
- * @param vertexName the relevant vertex name
- * @return total number of tasks in this vertex
- */
- int getVertexTotalTaskCount(String vertexName);
-
- /**
- * Get the number of completed tasks for a given vertex
- *
- * @param vertexName the vertex name
- * @return the number of completed tasks for the vertex
- */
- int getVertexCompletedTaskCount(String vertexName);
-
- /**
- * Get the number of running tasks for a given vertex
- *
- * @param vertexName the vertex name
- * @return the number of running tasks for the vertex
- */
- int getVertexRunningTaskCount(String vertexName);
-
- /**
- * Get the start time for the first attempt of the specified task
- *
- * @param vertexName the vertex to which the task belongs
- * @param taskIndex the index of the task
- * @return the start time for the first attempt of the task
- */
- long getFirstAttemptStartTime(String vertexName, int taskIndex);
-
- /**
- * Get the start time for the currently executing DAG
- *
- * @return time when the current dag started executing
- */
- long getDagStartTime();
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
deleted file mode 100644
index d0c22d3..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatRequest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.util.List;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public class TaskHeartbeatRequest {
-
- // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request.
- private final String containerIdentifier;
- private final TezTaskAttemptID taskAttemptId;
- private final List<TezEvent> events;
- private final int startIndex;
- private final int preRoutedStartIndex;
- private final int maxEvents;
-
-
- public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
- int preRoutedStartIndex,
- int maxEvents) {
- this.containerIdentifier = containerIdentifier;
- this.taskAttemptId = taskAttemptId;
- this.events = events;
- this.startIndex = startIndex;
- this.preRoutedStartIndex = preRoutedStartIndex;
- this.maxEvents = maxEvents;
- }
-
- public String getContainerIdentifier() {
- return containerIdentifier;
- }
-
- public TezTaskAttemptID getTaskAttemptId() {
- return taskAttemptId;
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public int getStartIndex() {
- return startIndex;
- }
-
- public int getPreRoutedStartIndex() {
- return preRoutedStartIndex;
- }
-
- public int getMaxEvents() {
- return maxEvents;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
deleted file mode 100644
index dcf89ff..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.util.List;
-
-import org.apache.tez.runtime.api.impl.TezEvent;
-
-// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
-public class TaskHeartbeatResponse {
-
- private final boolean shouldDie;
- private final int nextFromEventId;
- private final int nextPreRoutedEventId;
- private final List<TezEvent> events;
-
- public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
- this.shouldDie = shouldDie;
- this.events = events;
- this.nextFromEventId = nextFromEventId;
- this.nextPreRoutedEventId = nextPreRoutedEventId;
- }
-
- public boolean isShouldDie() {
- return shouldDie;
- }
-
- public List<TezEvent> getEvents() {
- return events;
- }
-
- public int getNextFromEventId() {
- return nextFromEventId;
- }
-
- public int getNextPreRoutedEventId() {
- return nextPreRoutedEventId;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 2b7234c..7f88be2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -29,11 +29,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.DAG;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index 64a964b..a196114 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,7 +33,7 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.Utils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
@@ -53,14 +53,14 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
index 4f9780e..4a75875 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -21,7 +21,7 @@ import java.util.Map;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
index 47688d1..15d90d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezLocalTaskCommunicatorImpl.java
@@ -18,7 +18,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TezUncheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index d071e0d..0bbe97a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -47,10 +47,10 @@ import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
new file mode 100644
index 0000000..8f919d1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.common.ServicePluginLifecycle;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
+
+/**
+ * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM.
+ * This is used to communicate with running services, potentially launching tasks, and getting
+ * updates from running tasks.
+ * <p/>
+ * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides
+ * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
+ *
+ * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking
+ * of this heartbeat mechanism, handling lost or duplicate responses.
+ *
+ */
+public abstract class TaskCommunicator implements ServicePluginLifecycle {
+
+ // TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
+ // - registerContainerEnd should provide the end reason / possible rename
+ // - get rid of getAddress
+ // - Add methods to support task preemption
+ // - Add a dagStarted notification, along with a payload
+ // - taskSpec breakup into a clean interface
+ // - Add methods to report task / container completion
+
+ private final TaskCommunicatorContext taskCommunicatorContext;
+
+ public TaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+ this.taskCommunicatorContext = taskCommunicatorContext;
+ }
+
+ /**
+ * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which
+ * is
+ * used to communicate with the rest of the system
+ *
+ * @return an instance of {@link TaskCommunicatorContext}
+ */
+ public TaskCommunicatorContext getContext() {
+ return taskCommunicatorContext;
+ }
+
+ /**
+ * An entry point for initialization.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void initialize() throws Exception {
+ }
+
+ /**
+ * An entry point for starting the service.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void start() throws Exception {
+ }
+
+ /**
+ * Stop the service. This could be invoked at any point, when the service is no longer required -
+ * including in case of errors.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void shutdown() throws Exception {
+ }
+
+
+ /**
+ * Register a new container.
+ *
+ * @param containerId the associated containerId
+ * @param hostname the hostname on which the container runs
+ * @param port the port for the service which is running the container
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void registerRunningContainer(ContainerId containerId, String hostname,
+ int port) throws ServicePluginException;
+
+ /**
+ * Register the end of a container. This can be caused by preemption, the container completing
+ * successfully, etc.
+ *
+ * @param containerId the associated containerId
+ * @param endReason the end reason for the container completing
+ * @param diagnostics diagnostics associated with the container end
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason,
+ @Nullable String diagnostics) throws
+ ServicePluginException;
+
+ /**
+ * Register a task attempt to execute on a container
+ *
+ * @param containerId the containerId on which this task needs to run
+ * @param taskSpec the task specifications for the task to be executed
+ * @param additionalResources additional local resources which may be required to run this task
+ * on
+ * the container
+ * @param credentials the credentials required to run this task
+ * @param credentialsChanged whether the credentials are different from the original credentials
+ * associated with this container
+ * @param priority the priority of the task being executed
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
+ Map<String, LocalResource> additionalResources,
+ Credentials credentials,
+ boolean credentialsChanged, int priority) throws
+ ServicePluginException;
+
+ /**
+ * Register the completion of a task. This may be a result of preemption, the container dying,
+ * the node dying, the task completing to success
+ *
+ * @param taskAttemptID the task attempt which has completed / needs to be completed
+ * @param endReason the endReason for the task attempt.
+ * @param diagnostics diagnostics associated with the task end
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics) throws
+ ServicePluginException;
+
+ /**
+ * Return the address, if any, that the service listens on
+ *
+ * @return the address
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract InetSocketAddress getAddress() throws ServicePluginException;
+
+ /**
+ * Receive notifications on vertex state changes.
+ * <p/>
+ * State changes will be received based on the registration via {@link
+ * org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
+ * java.util.Set)}. Notifications will be received for all registered state changes, and not just
+ * for the latest state update. They will be in order in which the state change occurred. </p>
+ * <p/>
+ * Extensive processing should not be performed via this method call. Instead this should just be
+ * used as a notification mechanism.
+ * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
+ * and
+ * multi-threading/concurrency implications must be considered.
+ *
+ * @param stateUpdate an event indicating the name of the vertex, and it's updated state.
+ * Additional information may be available for specific events, Look at the
+ * type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws ServicePluginException;
+
+ /**
+ * Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
+ * query information about the current dag during the duration of the dagComplete invocation.
+ * <p/>
+ * After this, the contents returned from querying the context may change at any point - due to
+ * the next dag being submitted.
+ *
+ * @param dagIdentifier the unique numerical identifier for the DAG in the specified execution context.
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract void dagComplete(int dagIdentifier) throws ServicePluginException;
+
+ /**
+ * Share meta-information such as host:port information where the Task Communicator may be
+ * listening.
+ * Primarily for use by compatible launchers to learn this information.
+ *
+ * @return meta info for the task communicator
+ * @throws ServicePluginException when the service runs into a fatal error which it cannot handle.
+ * This will cause the app to shutdown.
+ */
+ public abstract Object getMetaInfo() throws ServicePluginException;
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
new file mode 100644
index 0000000..c55bdbd
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+// Do not make calls into this from within a held lock.
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public interface TaskCommunicatorContext {
+
+ // TODO TEZ-2003 (post) TEZ-2666 Enhancements to API
+ // - Consolidate usage of IDs
+ // - Split the heartbeat API to a liveness check and a status update
+ // - Rename and consolidate TaskHeartbeatResponse and TaskHeartbeatRequest
+ // - Fix taskStarted needs to be invoked before launching the actual task.
+ // - Potentially add methods to report availability stats to the scheduler
+ // - Report taskSuccess via a method instead of the heartbeat
+ // - Add methods to signal container / task state changes
+ // - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
+ // - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
+
+ /**
+ * Get the UserPayload that was configured while setting up the task communicator
+ *
+ * @return the initially configured user payload
+ */
+ UserPayload getInitialUserPayload();
+
+ /**
+ * Get the application attempt id for the running application. Relevant when running under YARN
+ *
+ * @return the applicationAttemptId for the running app
+ */
+ ApplicationAttemptId getApplicationAttemptId();
+
+ /**
+ * Get credentials associated with the AppMaster
+ *
+ * @return credentials
+ */
+ Credentials getCredentials();
+
+ /**
+ * Check whether a running attempt can commit. This provides a leader election mechanism amongst
+ * multiple running attempts
+ *
+ * @param taskAttemptId the associated task attempt id
+ * @return whether the attempt can commit or not
+ * @throws IOException
+ */
+ boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+
+ /**
+ * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as
+ * receive new information which may need to be propagated to the task. This includes events
+ * generated by the task and events which need to be sent to the task
+ * This method must be invoked periodically to receive updates for a running task
+ *
+ * @param request the update from the running task.
+ * @return the response that is requried by the task.
+ * @throws IOException
+ * @throws TezException
+ */
+ TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
+
+ /**
+ * Check whether the container is known by the framework. The state of this container is
+ * irrelevant
+ *
+ * @param containerId the relevant container id
+ * @return true if the container is known, false if it isn't
+ */
+ boolean isKnownContainer(ContainerId containerId);
+
+ /**
+ * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the
+ * task attempt timing out.
+ * Invocations to heartbeat provides the same keep-alive functionality
+ *
+ * @param taskAttemptId the relevant task attempt
+ */
+ void taskAlive(TezTaskAttemptID taskAttemptId);
+
+ /**
+ * Inform the framework that a container is alive. This need to be invoked periodically to avoid
+ * the container attempt timing out.
+ * Invocations to heartbeat provides the same keep-alive functionality
+ *
+ * @param containerId the relevant container id
+ */
+ void containerAlive(ContainerId containerId);
+
+ /**
+ * Inform the framework that the task has started execution
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param containerId the containerId in which the task attempt is running
+ */
+ void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
+
+ /**
+ * Inform the framework that a task has been killed
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param taskAttemptEndReason the reason for the task attempt being killed
+ * @param diagnostics any diagnostics messages which are relevant to the task attempt
+ * kill
+ */
+ void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics);
+
+ /**
+ * Inform the framework that a task has failed
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param taskAttemptEndReason the reason for the task failure
+ * @param diagnostics any diagnostics messages which are relevant to the task attempt
+ * failure
+ */
+ void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics);
+
+ /**
+ * Register to get notifications on updates to the specified vertex. Notifications will be sent
+ * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+ * </p>
+ * <p/>
+ * This method can only be invoked once. Duplicate invocations will result in an error.
+ *
+ * @param vertexName the vertex name for which notifications are required.
+ * @param stateSet the set of states for which notifications are required. null implies all
+ */
+ void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet);
+
+ /**
+ * Get the name of the currently executing dag
+ *
+ * @return the name of the currently executing dag
+ */
+ String getCurrentDagName();
+
+ /**
+ * Get an identifier for the executing context of the DAG.
+ * @return a String identifier for the exeucting context.
+ */
+ String getCurrentAppIdentifier();
+
+ /**
+ * Get the identifier for the currently executing dag.
+ * @return a numerical identifier for the currently running DAG. This is unique within the currently running application.
+ */
+ int getCurrentDagIdenitifer();
+
+ /**
+ * Get the name of the Input vertices for the specified vertex.
+ * Root Inputs are not returned.
+ *
+ * @param vertexName the vertex for which source vertex names will be returned
+ * @return an Iterable containing the list of input vertices for the specified vertex
+ */
+ Iterable<String> getInputVertexNames(String vertexName);
+
+ /**
+ * Get the total number of tasks in the given vertex
+ *
+ * @param vertexName the relevant vertex name
+ * @return total number of tasks in this vertex
+ */
+ int getVertexTotalTaskCount(String vertexName);
+
+ /**
+ * Get the number of completed tasks for a given vertex
+ *
+ * @param vertexName the vertex name
+ * @return the number of completed tasks for the vertex
+ */
+ int getVertexCompletedTaskCount(String vertexName);
+
+ /**
+ * Get the number of running tasks for a given vertex
+ *
+ * @param vertexName the vertex name
+ * @return the number of running tasks for the vertex
+ */
+ int getVertexRunningTaskCount(String vertexName);
+
+ /**
+ * Get the start time for the first attempt of the specified task
+ *
+ * @param vertexName the vertex to which the task belongs
+ * @param taskIndex the index of the task
+ * @return the start time for the first attempt of the task
+ */
+ long getFirstAttemptStartTime(String vertexName, int taskIndex);
+
+ /**
+ * Get the start time for the currently executing DAG
+ *
+ * @return time when the current dag started executing
+ */
+ long getDagStartTime();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
new file mode 100644
index 0000000..40b006f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.util.List;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public class TaskHeartbeatRequest {
+
+ // TODO TEZ-2003 (post) TEZ-2666 Ideally containerIdentifier should not be part of the request.
+ private final String containerIdentifier;
+ private final TezTaskAttemptID taskAttemptId;
+ private final List<TezEvent> events;
+ private final int startIndex;
+ private final int preRoutedStartIndex;
+ private final int maxEvents;
+
+
+ public TaskHeartbeatRequest(String containerIdentifier, TezTaskAttemptID taskAttemptId, List<TezEvent> events, int startIndex,
+ int preRoutedStartIndex,
+ int maxEvents) {
+ this.containerIdentifier = containerIdentifier;
+ this.taskAttemptId = taskAttemptId;
+ this.events = events;
+ this.startIndex = startIndex;
+ this.preRoutedStartIndex = preRoutedStartIndex;
+ this.maxEvents = maxEvents;
+ }
+
+ public String getContainerIdentifier() {
+ return containerIdentifier;
+ }
+
+ public TezTaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public int getStartIndex() {
+ return startIndex;
+ }
+
+ public int getPreRoutedStartIndex() {
+ return preRoutedStartIndex;
+ }
+
+ public int getMaxEvents() {
+ return maxEvents;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
new file mode 100644
index 0000000..9145004
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskHeartbeatResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.serviceplugins.api;
+
+import java.util.List;
+
+import org.apache.tez.runtime.api.impl.TezEvent;
+
+// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
+public class TaskHeartbeatResponse {
+
+ private final boolean shouldDie;
+ private final int nextFromEventId;
+ private final int nextPreRoutedEventId;
+ private final List<TezEvent> events;
+
+ public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId, int nextPreRoutedEventId) {
+ this.shouldDie = shouldDie;
+ this.events = events;
+ this.nextFromEventId = nextFromEventId;
+ this.nextPreRoutedEventId = nextPreRoutedEventId;
+ }
+
+ public boolean isShouldDie() {
+ return shouldDie;
+ }
+
+ public List<TezEvent> getEvents() {
+ return events;
+ }
+
+ public int getNextFromEventId() {
+ return nextFromEventId;
+ }
+
+ public int getNextPreRoutedEventId() {
+ return nextPreRoutedEventId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 5222a2d..869bfd5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.verify;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index d76a5b3..5323928 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -49,8 +49,8 @@ import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
index 2921a22..0f8afaa 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -54,19 +54,19 @@ import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
+import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
index 212bca4..e89cc99 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorWrapper.java
@@ -29,7 +29,7 @@
package org.apache.tez.dag.app;
import com.google.common.collect.Sets;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.junit.Test;
public class TestTaskCommunicatorWrapper {
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 4772492..3bb688e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index c649870..c62ff21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@@ -71,10 +70,7 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.dag.app.TaskCommunicatorManager;
-import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 8b8b6d7..ed14871 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,7 +67,7 @@ import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index e21dda1..2fcd0c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 127967a..f199dcf 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.TezTestServiceCommunicator;
import org.apache.tez.dag.records.TezTaskAttemptID;
http://git-wip-us.apache.org/repos/asf/tez/blob/fad16425/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
index 0a3d8d4..90313d4 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorWithErrors.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.serviceplugins.api.TaskCommunicator;
+import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TaskSpec;