You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/22 03:19:31 UTC
[41/50] [abbrv] tez git commit: TEZ-2675. Add javadocs for new
pluggable components, fix problems reported by jenkins. (sseth)
TEZ-2675. Add javadocs for new pluggable components, fix problems
reported by jenkins. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/fda06553
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/fda06553
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/fda06553
Branch: refs/heads/TEZ-2003
Commit: fda065536bfb9c7435fe88300ca617ed642a55cf
Parents: 5ce54b8
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Aug 7 14:49:58 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Aug 21 18:15:23 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
pom.xml | 1 +
.../java/org/apache/tez/client/TezClient.java | 52 ++++-
.../tez/common/ServicePluginLifecycle.java | 9 +
.../main/java/org/apache/tez/dag/api/DAG.java | 2 +-
.../tez/dag/api/NamedEntityDescriptor.java | 17 ++
.../java/org/apache/tez/dag/api/Vertex.java | 38 +++-
.../api/ContainerLaunchRequest.java | 11 +-
.../serviceplugins/api/ContainerLauncher.java | 35 ++++
.../api/ContainerLauncherContext.java | 63 +++++-
.../api/ContainerLauncherOperationBase.java | 17 ++
.../api/ContainerStopRequest.java | 3 +
.../api/ServicePluginsDescriptor.java | 19 +-
.../tez/serviceplugins/api/TaskScheduler.java | 161 ++++++++++++++--
.../api/TaskSchedulerContext.java | 190 +++++++++++++++----
.../apache/tez/dag/api/TaskCommunicator.java | 95 +++++++++-
.../tez/dag/api/TaskCommunicatorContext.java | 100 +++++++++-
.../dag/app/ContainerLauncherContextImpl.java | 1 +
.../dag/app/TaskAttemptListenerImpTezDag.java | 15 +-
.../app/launcher/ContainerLauncherRouter.java | 14 +-
.../dag/app/rm/TaskSchedulerContextImpl.java | 3 +
.../dag/app/rm/TaskSchedulerEventHandler.java | 16 +-
.../apache/tez/runtime/task/TezTaskRunner2.java | 2 +-
23 files changed, 754 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index b133ea3..75fac88 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -44,5 +44,6 @@ ALL CHANGES:
TEZ-2626. Fix log lines with DEBUG in messages, consolidate TEZ-2003 TODOs.
TEZ-2126. Add unit tests for verifying multiple schedulers, launchers, communicators.
TEZ-2698. rebase 08/05
+ TEZ-2675. Add javadocs for new pluggable components, fix problems reported by jenkins
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7ae5f31..bf2a6cf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -780,6 +780,7 @@
<configuration>
<excludes>
<exclude>CHANGES.txt</exclude>
+ <exclude>TEZ-2003-CHANGES.txt</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 27f0a81..9e7fe51 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -284,7 +284,7 @@ public class TezClient {
* Only LocalResourceType.FILE is supported. All files will be treated as
* private.
*
- * @param localFiles
+ * @param localFiles the files to be made available in the AM
*/
public synchronized void addAppMasterLocalFiles(Map<String, LocalResource> localFiles) {
Preconditions.checkNotNull(localFiles);
@@ -314,7 +314,7 @@ public class TezClient {
* Master for the next DAG. <br>In session mode, credentials, if needed, must be
* set before calling start()
*
- * @param credentials
+ * @param credentials credentials
*/
public synchronized void setAppMasterCredentials(Credentials credentials) {
Preconditions
@@ -883,6 +883,9 @@ public class TezClient {
append(tezDagIdFormat.get().format(1)).toString();
}
+ /**
+ * A builder for setting up an instance of {@link org.apache.tez.client.TezClient}
+ */
@Public
public static class TezClientBuilder {
final String name;
@@ -892,6 +895,15 @@ public class TezClient {
private Credentials credentials;
ServicePluginsDescriptor servicePluginsDescriptor;
+ /**
+ * Create an instance of a TezClientBuilder
+ *
+ * @param name
+ * Name of the client. Used for logging etc. This will also be used
+ * as app master name is session mode
+ * @param tezConf
+ * Configuration for the framework
+ */
private TezClientBuilder(String name, TezConfiguration tezConf) {
this.name = name;
this.tezConf = tezConf;
@@ -899,26 +911,62 @@ public class TezClient {
TezConfiguration.TEZ_AM_SESSION_MODE, TezConfiguration.TEZ_AM_SESSION_MODE_DEFAULT);
}
+ /**
+ * Specify whether this client is a session or not
+ * @param isSession whether the client is a session
+ * @return the current builder
+ */
public TezClientBuilder setIsSession(boolean isSession) {
this.isSession = isSession;
return this;
}
+ /**
+ * Set local resources to be used by the AppMaster
+ *
+ * @param localResources local files for the App Master
+ * @return the files to be added to the AM
+ */
public TezClientBuilder setLocalResources(Map<String, LocalResource> localResources) {
this.localResourceMap = localResources;
return this;
}
+ /**
+ * Setup security credentials
+ *
+ * @param credentials
+ * Set security credentials to be used inside the app master, if
+ * needed. Tez App Master needs credentials to access the staging
+ * directory and for most HDFS cases these are automatically obtained
+ * by Tez client. If the staging directory is on a file system for
+ * which credentials cannot be obtained or for any credentials needed
+ * by user code running inside the App Master, credentials must be
+ * supplied by the user. These will be used by the App Master for the
+ * next DAG. <br>
+ * In session mode, credentials, if needed, must be set before
+ * calling start()
+ * @return the current builder
+ */
public TezClientBuilder setCredentials(Credentials credentials) {
this.credentials = credentials;
return this;
}
+ /**
+ * Specify the service plugins that will be running in the AM
+ * @param servicePluginsDescriptor the service plugin descriptor with details about the plugins running in the AM
+ * @return the current builder
+ */
public TezClientBuilder setServicePluginDescriptor(ServicePluginsDescriptor servicePluginsDescriptor) {
this.servicePluginsDescriptor = servicePluginsDescriptor;
return this;
}
+ /**
+ * Build the actual instance of the {@link TezClient}
+ * @return an instance of {@link TezClient}
+ */
public TezClient build() {
return new TezClient(name, tezConf, isSession, localResourceMap, credentials,
servicePluginsDescriptor);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
index 2eaa7be..b52b08c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ServicePluginLifecycle.java
@@ -17,6 +17,15 @@ package org.apache.tez.common;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+/**
+ * Defines a lifecycle for a Service. The typical implementation for services when used within the
+ * Tez framework would be
+ * 1. Construct the object.
+ * 2. initialize()
+ * 3. start()
+ * stop() - is invoked when the service is no longer required, and could be invoked while in any
+ * state, in case of failures
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ServicePluginLifecycle {
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index fce9522..927039a 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -343,7 +343,7 @@ public class DAG {
*
* @param vertexExecutionContext the default execution context for the DAG
*
- * @return
+ * @return this DAG
*/
@Public
@InterfaceStability.Unstable
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
index 17c8c6c..426d4eb 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/NamedEntityDescriptor.java
@@ -14,9 +14,14 @@
package org.apache.tez.dag.api;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
+@SuppressWarnings("unchecked")
public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends EntityDescriptor<NamedEntityDescriptor<T>> {
private final String entityName;
@@ -37,6 +42,18 @@ public class NamedEntityDescriptor<T extends NamedEntityDescriptor<T>> extends E
}
@Override
+ public void write(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException(
+ "write is not expected to be used for a NamedEntityDescriptor");
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ throw new UnsupportedOperationException(
+ "readFields is not expected to be used for a NamedEntityDescriptor");
+ }
+
+ @Override
public String toString() {
boolean hasPayload =
getUserPayload() == null ? false : getUserPayload().getPayload() == null ? false : true;
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 8953ae1..3f52a3d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -419,13 +419,16 @@ public class Vertex {
*
* @param vertexExecutionContext the execution context for the vertex.
*
- * @return
+ * @return this Vertex
*/
public Vertex setExecutionContext(VertexExecutionContext vertexExecutionContext) {
this.vertexExecutionContext = vertexExecutionContext;
return this;
}
+ /**
+ * The execution context for a running vertex.
+ */
@Public
@InterfaceStability.Unstable
public static class VertexExecutionContext {
@@ -435,15 +438,39 @@ public class Vertex {
final String containerLauncherName;
final String taskCommName;
+ /**
+ * Create an execution context which specifies whether the vertex needs to be executed in the
+ * AM
+ *
+ * @param executeInAm whether to execute the vertex in the AM
+ * @return the relevant execution context
+ */
public static VertexExecutionContext createExecuteInAm(boolean executeInAm) {
return new VertexExecutionContext(executeInAm, false);
}
+ /**
+ * Create an execution context which specifies whether the vertex needs to be executed in
+ * regular containers
+ *
+ * @param executeInContainers whether to execute the vertex in regular containers
+ * @return the relevant execution context
+ */
public static VertexExecutionContext createExecuteInContainers(boolean executeInContainers) {
return new VertexExecutionContext(false, executeInContainers);
}
- public static VertexExecutionContext create(String taskSchedulerName, String containerLauncherName,
+ /**
+ * @param taskSchedulerName the task scheduler name which was setup while creating the
+ * {@link org.apache.tez.client.TezClient}
+ * @param containerLauncherName the container launcher name which was setup while creating the
+ * {@link org.apache.tez.client.TezClient}
+ * @param taskCommName the task communicator name which was setup while creating the
+ * {@link org.apache.tez.client.TezClient}
+ * @return the relevant execution context
+ */
+ public static VertexExecutionContext create(String taskSchedulerName,
+ String containerLauncherName,
String taskCommName) {
return new VertexExecutionContext(taskSchedulerName, containerLauncherName, taskCommName);
}
@@ -453,12 +480,13 @@ public class Vertex {
}
private VertexExecutionContext(String taskSchedulerName, String containerLauncherName,
- String taskCommName) {
+ String taskCommName) {
this(false, false, taskSchedulerName, containerLauncherName, taskCommName);
}
- private VertexExecutionContext(boolean executeInAm, boolean executeInContainers, String taskSchedulerName, String containerLauncherName,
- String taskCommName) {
+ private VertexExecutionContext(boolean executeInAm, boolean executeInContainers,
+ String taskSchedulerName, String containerLauncherName,
+ String taskCommName) {
if (executeInAm || executeInContainers) {
Preconditions.checkState(!(executeInAm && executeInContainers),
"executeInContainers and executeInAM are mutually exclusive");
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
index cfd7ca7..f998fa2 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLaunchRequest.java
@@ -22,6 +22,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
+/**
+ * Contains specifications for a container which needs to be launched
+ */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
@@ -46,6 +49,10 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
// TODO Post TEZ-2003. TEZ-2625. ContainerLaunchContext needs to be built here instead of being passed in.
// Basic specifications need to be provided here
+ /**
+ * The {@link ContainerLauncherContext} for the container being launched
+ * @return the container launch context for the launch request
+ */
public ContainerLaunchContext getContainerLaunchContext() {
return clc;
}
@@ -53,7 +60,7 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
/**
* Get the name of the task communicator which will be used to communicate
* with the task that will run in this container.
- * @return
+ * @return the task communicator to be used for this request
*/
public String getTaskCommunicatorName() {
return taskCommName;
@@ -61,7 +68,7 @@ public class ContainerLaunchRequest extends ContainerLauncherOperationBase {
/**
* Get the name of the scheduler which allocated this container.
- * @return
+ * @return the scheduler name which provided the container
*/
public String getSchedulerName() {
return schedulerName;
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
index 7f58f77..5a77b69 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncher.java
@@ -33,22 +33,57 @@ public abstract class ContainerLauncher implements ServicePluginLifecycle {
this.containerLauncherContext = containerLauncherContext;
}
+ /**
+ * An entry point for initialization.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void initialize() throws Exception {
}
+ /**
+ * An entry point for starting the service.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void start() throws Exception {
}
+ /**
+ * Stop the service. This could be invoked at any point, when the service is no longer required -
+ * including in case of errors.
+ *
+ * @throws Exception
+ */
@Override
public void shutdown() throws Exception {
}
+ /**
+ * Get the {@link ContainerLauncherContext} associated with this instance of the container
+ * launcher, which is used to communicate with the rest of the system
+ *
+ * @return an instance of {@link ContainerLauncherContext}
+ */
public final ContainerLauncherContext getContext() {
return this.containerLauncherContext;
}
+ /**
+ * A request to launch the specified container
+ *
+ * @param launchRequest the actual launch request
+ */
public abstract void launchContainer(ContainerLaunchRequest launchRequest);
+
+ /**
+ * A request to stop a specific container
+ *
+ * @param stopRequest the actual stop request
+ */
public abstract void stopContainer(ContainerStopRequest stopRequest);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
index 5da38b8..dcd9e80 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherContext.java
@@ -24,30 +24,87 @@ import org.apache.tez.dag.api.UserPayload;
@InterfaceStability.Unstable
public interface ContainerLauncherContext {
- // TODO Post TEZ-2003. Tez abstraction for ContainerId, NodeId, other YARN constructs
+ // TODO TEZ-2003 (post) TEZ-2664 Tez abstraction for ContainerId, NodeId, other YARN constructs
// Reporting APIs
+
+ /**
+ * Inform the framework that a container has been launched
+ *
+ * @param containerId the id of the container that has been launched
+ */
void containerLaunched(ContainerId containerId);
+ /**
+ * Inform the framework of an issue while trying to launch a container.
+ *
+ * @param containerId the id of the container which failed to launch
+ * @param diagnostics diagnostics for the failure
+ */
void containerLaunchFailed(ContainerId containerId, String diagnostics);
+ /**
+ * Inform the framework that a request has been made to stop a container
+ *
+ * @param containerId the id of the associated container
+ */
void containerStopRequested(ContainerId containerId);
+ /**
+ * Inform the framework that the attempt to stop a container failed
+ *
+ * @param containerId the id of the associated container
+ * @param diagnostics diagnostics for the failure
+ */
void containerStopFailed(ContainerId containerId, String diagnostics);
- // TODO Post TEZ-2003. TaskAttemptEndReason does not belong here, and is an unnecessary leak.
+ // TODO TEZ-2003 (post). TEZ-2676 TaskAttemptEndReason does not belong here, and is an unnecessary leak.
// ContainerCompleted is normally generated by the scheduler in case of YARN since the RM informs about completion.
// For other sources, there may not be a central entity making this information available. The ContainerLauncher
// on the stop request will likely be the best place to generate it.
- void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics, TaskAttemptEndReason endReason);
+
+ /**
+ * Inform the scheduler that a container was successfully stopped
+ *
+ * @param containerId the id of the associated container
+ * @param exitStatus the exit status of the container
+ * @param diagnostics diagnostics associated with the container end
+ * @param endReason the end reason for the task running in the container
+ */
+ void containerCompleted(ContainerId containerId, int exitStatus, String diagnostics,
+ TaskAttemptEndReason endReason);
// Lookup APIs
+ /**
+ * Get the UserPayload that was configured while setting up the launcher
+ *
+ * @return the initially configured user payload
+ */
UserPayload getInitialUserPayload();
+ /**
+ * Get the number of nodes being handled by the specified source
+ *
+ * @param sourceName the relevant source name
+ * @return the initial payload
+ */
int getNumNodes(String sourceName);
+ /**
+ * Get the application attempt id for the running application. Relevant when running under YARN
+ *
+ * @return the applicationAttemptId for the running app
+ */
ApplicationAttemptId getApplicationAttemptId();
+ /**
+ * Get meta info from the specified TaskCommunicator. This assumes that the launched has been
+ * setup
+ * along with a compatible TaskCommunicator, and the launcher knows how to read this meta-info
+ *
+ * @param taskCommName the name of the task communicator
+ * @return meta info for the requested task communicator
+ */
Object getTaskCommunicatorMetaInfo(String taskCommName);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
index 29e0420..260b681 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerLauncherOperationBase.java
@@ -24,6 +24,11 @@ import org.apache.hadoop.yarn.api.records.Token;
@InterfaceStability.Unstable
public class ContainerLauncherOperationBase {
+ // TODO TEZ-2702 (TEZ-2003 post)
+ // - Get rid of YARN constructs.
+ // - ContainerToken may not always be required
+
+
private final NodeId nodeId;
private final ContainerId containerId;
private final Token containerToken;
@@ -36,14 +41,26 @@ public class ContainerLauncherOperationBase {
this.containerToken = containerToken;
}
+ /**
+ * Get the node on whcih this container is to be launched
+ * @return
+ */
public NodeId getNodeId() {
return nodeId;
}
+ /**
+ * Get the containerId for the container
+ * @return
+ */
public ContainerId getContainerId() {
return containerId;
}
+ /**
+ * Get the security token for the container. Primarily for YARN
+ * @return
+ */
public Token getContainerToken() {
return containerToken;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
index cb0af31..be7d00a 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ContainerStopRequest.java
@@ -20,6 +20,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
+/**
+ * Contains specifications for a container which needs to be stopped
+ */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ContainerStopRequest extends ContainerLauncherOperationBase {
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
index 2e4fc46..ce35350 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/ServicePluginsDescriptor.java
@@ -18,6 +18,10 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+/**
+ * An {@link ServicePluginsDescriptor} describes the list of plugins running within the AM for
+ * sourcing resources, launching and executing work.
+ */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class ServicePluginsDescriptor {
@@ -53,7 +57,7 @@ public class ServicePluginsDescriptor {
* @param taskSchedulerDescriptor the task scheduler plugin descriptors
* @param containerLauncherDescriptors the container launcher plugin descriptors
* @param taskCommunicatorDescriptors the task communicator plugin descriptors
- * @return
+ * @return a {@link ServicePluginsDescriptor} instance
*/
public static ServicePluginsDescriptor create(TaskSchedulerDescriptor[] taskSchedulerDescriptor,
ContainerLauncherDescriptor[] containerLauncherDescriptors,
@@ -69,7 +73,7 @@ public class ServicePluginsDescriptor {
* @param taskSchedulerDescriptor the task scheduler plugin descriptors
* @param containerLauncherDescriptors the container launcher plugin descriptors
* @param taskCommunicatorDescriptors the task communicator plugin descriptors
- * @return
+ * @return a {@link ServicePluginsDescriptor} instance
*/
public static ServicePluginsDescriptor create(boolean enableUber,
TaskSchedulerDescriptor[] taskSchedulerDescriptor,
@@ -88,7 +92,7 @@ public class ServicePluginsDescriptor {
* @param taskSchedulerDescriptor the task scheduler plugin descriptors
* @param containerLauncherDescriptors the container launcher plugin descriptors
* @param taskCommunicatorDescriptors the task communicator plugin descriptors
- * @return
+ * @return a {@link ServicePluginsDescriptor} instance
*/
public static ServicePluginsDescriptor create(boolean enableContainers, boolean enableUber,
TaskSchedulerDescriptor[] taskSchedulerDescriptor,
@@ -103,30 +107,35 @@ public class ServicePluginsDescriptor {
* execution is enabled by default
*
* @param enableUber whether to enable execution in the AM or not
- * @return
+ * @return a {@link ServicePluginsDescriptor} instance
*/
public static ServicePluginsDescriptor create(boolean enableUber) {
return new ServicePluginsDescriptor(true, enableUber, null, null, null);
}
+ @InterfaceAudience.Private
public boolean areContainersEnabled() {
return enableContainers;
}
+ @InterfaceAudience.Private
public boolean isUberEnabled() {
return enableUber;
}
+ @InterfaceAudience.Private
public TaskSchedulerDescriptor[] getTaskSchedulerDescriptors() {
return taskSchedulerDescriptors;
}
+ @InterfaceAudience.Private
public ContainerLauncherDescriptor[] getContainerLauncherDescriptors() {
return containerLauncherDescriptors;
}
+ @InterfaceAudience.Private
public TaskCommunicatorDescriptor[] getTaskCommunicatorDescriptors() {
return taskCommunicatorDescriptors;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index b1fb349..de76029 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -14,23 +14,34 @@
package org.apache.tez.serviceplugins.api;
+import javax.annotation.Nullable;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ServicePluginLifecycle;
+/**
+ * This class represents the API for a custom TaskScheduler which can be run within the Tez AM.
+ * This can be used to source resources from different sources, as well as control the logic of
+ * how these resources get allocated to the different tasks within a DAG which needs resources.
+ * <p/>
+ * The plugin is initialized with an instance of {@link TaskSchedulerContext} - which provides
+ * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
+ */
@InterfaceAudience.Public
@InterfaceStability.Unstable
public abstract class TaskScheduler implements ServicePluginLifecycle {
// TODO TEZ-2003 (post) TEZ-2668
// - Should setRegister / unregister be part of APIs when not YARN specific ?
- // - Include vertex / task information in therequest so that the scheduler can make decisions
+ // - Include vertex / task information in the request so that the scheduler can make decisions
// around prioritizing tasks in the same vertex when others exist at the same priority.
+ // There should be an interface around Object task - if it's meant to be used for equals / hashCode.
private final TaskSchedulerContext taskSchedulerContext;
@@ -38,55 +49,179 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
this.taskSchedulerContext = taskSchedulerContext;
}
+ /**
+ * An entry point for initialization.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void initialize() throws Exception {
}
+ /**
+ * An entry point for starting the service.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void start() throws Exception {
}
+ /**
+ * Stop the service. This could be invoked at any point, when the service is no longer required -
+ * including in case of errors.
+ *
+ * @throws Exception
+ */
@Override
public void shutdown() throws Exception {
}
+ /**
+ * The first step of stopping the task scheduler service. This would typically be used to stop
+ * allocating new resources. shutdown() will typically be used to unregister from external
+ * services - especially YARN for instance, so that the app is not killed
+ */
public void initiateStop() {
}
- public abstract Resource getAvailableResources();
-
- public abstract int getClusterNodeCount();
+ /**
+ * Get the {@link TaskSchedulerContext} associated with this instance of the scheduler, which is
+ * used to communicate with the rest of the system
+ *
+ * @return an instance of {@link TaskSchedulerContext}
+ */
+ public final TaskSchedulerContext getContext() {
+ return taskSchedulerContext;
+ }
- public abstract void dagComplete();
+ /**
+ * Get the currently available resources from this source
+ *
+ * @return the resources available at the time of invocation
+ */
+ public abstract Resource getAvailableResources();
+ /**
+ * Get the total available resources from this source
+ *
+ * @return the total available resources from the source
+ */
public abstract Resource getTotalResources();
+ /**
+ * Get the number of nodes available from the source
+ *
+ * @return the number of nodes
+ */
+ public abstract int getClusterNodeCount();
+
+ /**
+ * Indication to a source that a node has been blacklisted, and should not be used for subsequent
+ * allocations.
+ *
+ * @param nodeId te nodeId to be blacklisted
+ */
public abstract void blacklistNode(NodeId nodeId);
+ /**
+ * Indication to a source that a node has been un-blacklisted, and can be used from subsequent
+ * allocations
+ *
+ * @param nodeId the nodeId to be unblacklisted
+ */
public abstract void unblacklistNode(NodeId nodeId);
+ /**
+ * A request to the source to allocate resources for a requesting task, with location information
+ * optionally specified
+ *
+ * @param task the task for which resources are being accepted.
+ * @param capability the required resources to run this task
+ * @param hosts the preferred host locations for the task
+ * @param racks the preferred rack locations for the task
+ * @param priority the priority of the request for this allocation. A lower value
+ * implies a higher priority
+ * @param containerSignature the specifications for the container (environment, etc) which will
+ * be
+ * used for this task - if applicable
+ * @param clientCookie a cookie associated with this request. This should be returned back
+ * via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
+ * Container)} method when a task is assigned to a resource
+ */
public abstract void allocateTask(Object task, Resource capability,
String[] hosts, String[] racks, Priority priority,
Object containerSignature, Object clientCookie);
/**
- * Allocate affinitized to a specific container
+ * A request to the source to allocate resources for a requesting task, based on a previously used
+ * container
+ *
+ * @param task the task for which resources are being accepted.
+ * @param capability the required resources to run this task
+ * @param containerId a previous container which is used as an indication as to where this
+ * task should be placed
+ * @param priority the priority of the request for this allocation. A lower value
+ * implies a higher priority
+ * @param containerSignature the specifications for the container (environment, etc) which will
+ * be
+ * used for this task - if applicable
+ * @param clientCookie a cookie associated with this request. This should be returned back
+ * via the {@link TaskSchedulerContext#taskAllocated(Object, Object,
+ * Container)} method when a task is assigned to a resource
*/
public abstract void allocateTask(Object task, Resource capability,
- ContainerId containerId, Priority priority, Object containerSignature,
+ ContainerId containerId, Priority priority,
+ Object containerSignature,
Object clientCookie);
- /** Plugin writers must ensure to de-allocate a container once it's done, so that it can be collected. */
- public abstract boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason);
+ /**
+ * A request to deallocate a task. This is typically a result of a task completing - with success
+ * or failure. It could also be the result of a decision to not run the task, before it is
+ * allocated or started.
+ * <p/>
+ * Plugin writers need to de-allocate containers via the context once it's no longer required, for
+ * correct book-keeping
+ *
+ * @param task the task being de-allocated.
+ * @param taskSucceeded whether the task succeeded or not
+ * @param endReason the reason for the task failure
+ * @param diagnostics additional diagnostics information which may be relevant
+ * @return true if the task was associated with a container, false if the task was not associated
+ * with a container
+ */
+ public abstract boolean deallocateTask(Object task, boolean taskSucceeded,
+ TaskAttemptEndReason endReason,
+ @Nullable String diagnostics);
+ /**
+ * A request to de-allocate a previously allocated container.
+ *
+ * @param containerId the containerId to de-allocate
+ * @return the task which was previously associated with this container, null otherwise
+ */
public abstract Object deallocateContainer(ContainerId containerId);
+ /**
+ * Inform the scheduler that it should unregister. This is primarily valid for schedulers which
+ * require registration (YARN a.t.m)
+ */
public abstract void setShouldUnregister();
+ /**
+ * Checks with the scheduler whether it has unregistered.
+ *
+ * @return true if the scheduler has unregistered. False otherwise.
+ */
public abstract boolean hasUnregistered();
+ /**
+ * Indicates to the scheduler that the currently running dag has completed.
+ * This can be used to reset dag specific statistics, potentially release resources and prepare
+ * for a new DAG.
+ */
+ public abstract void dagComplete();
- public final TaskSchedulerContext getContext() {
- return taskSchedulerContext;
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
index dbbf75c..a24061f 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskSchedulerContext.java
@@ -31,15 +31,24 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.api.UserPayload;
+/**
+ * Context for a {@link TaskScheduler}
+ * <p/>
+ * This provides methods for a scheduler to interact with the Tez framework.
+ * <p/>
+ * Calls into this should be outside of locks, which may also be obtained by methods in the
+ * scheduler
+ * which implement the {@link TaskScheduler} interface
+ */
@InterfaceAudience.Public
@InterfaceStability.Unstable
-
public interface TaskSchedulerContext {
- public class AppFinalStatus {
+ class AppFinalStatus {
public final FinalApplicationStatus exitStatus;
public final String exitMessage;
public final String postCompletionTrackingUrl;
+
public AppFinalStatus(FinalApplicationStatus exitStatus,
String exitMessage,
String posCompletionTrackingUrl) {
@@ -49,67 +58,180 @@ public interface TaskSchedulerContext {
}
}
+ /**
+ * Indicates the state the AM is in.
+ */
enum AMState {
- IDLE, RUNNING_APP, COMPLETED
+ IDLE,
+ RUNNING_APP,
+ COMPLETED
}
// TODO TEZ-2003 (post) TEZ-2664. Remove references to YARN constructs like Container, ContainerStatus, NodeReport
// TODO TEZ-2003 (post) TEZ-2668 Enhancements to TaskScheduler interfaces
// - setApplicationRegistrationData may not be relevant to non YARN clusters
// - getAppFinalStatus may not be relevant to non YARN clusters
- // upcall to app must be outside locks
- public void taskAllocated(Object task,
- Object appCookie,
- Container container);
- // this may end up being called for a task+container pair that the app
- // has not heard about. this can happen because of a race between
- // taskAllocated() upcall and deallocateTask() downcall
- public void containerCompleted(Object taskLastAllocated,
- ContainerStatus containerStatus);
- public void containerBeingReleased(ContainerId containerId);
- public void nodesUpdated(List<NodeReport> updatedNodes);
- public void appShutdownRequested();
-
- // TODO Post TEZ-2003, this method specifically needs some cleaning up.
- // ClientAMSecretKey is only relevant when running under YARN. As are ApplicationACLs.
- public void setApplicationRegistrationData(
+
+
+ /**
+ * Indicate to the framework that a container is being assigned to a task.
+ *
+ * @param task the task for which a container is being assigned. This should be the same
+ * instance that was provided when requesting for an allocation
+ * @param appCookie the cookie which was provided while requesting allocation for this task
+ * @param container the actual container assigned to the task
+ */
+ void taskAllocated(Object task,
+ Object appCookie,
+ Container container);
+
+
+ /**
+ * Indicate to the framework that a container has completed. This is typically used by sources
+ * which have
+ * a means to indicate a container failure to the scheduler (typically centrally managed
+ * schedulers - YARN)
+ *
+ * @param taskLastAllocated the task that was allocated to this container, if any. This is the
+ * same instance that was passed in while requesting an allocation
+ * @param containerStatus the status with which the container ended
+ */
+ void containerCompleted(Object taskLastAllocated,
+ ContainerStatus containerStatus);
+
+ /**
+ * Indicates to the framework that a container is being released.
+ *
+ * @param containerId the id of the container being released
+ */
+ void containerBeingReleased(ContainerId containerId);
+
+
+ /**
+ * Provide an update to the framework about the status of nodes available to this report
+ *
+ * @param updatedNodes a list of updated node reports
+ */
+ void nodesUpdated(List<NodeReport> updatedNodes);
+
+ /**
+ * Inform the framework that an app shutdown is required. This should typically not be used, other
+ * than
+ * by the YARN scheduler.
+ */
+ void appShutdownRequested();
+
+ /**
+ * Provide an update to the framework about specific information about the source managed by this
+ * scheduler.
+ *
+ * @param maxContainerCapability the total resource capability of the source
+ * @param appAcls ACLs for the source
+ * @param clientAMSecretKey a secret key provided by the source
+ */
+ void setApplicationRegistrationData(
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey
);
- public void onError(Throwable t);
- public float getProgress();
- public void preemptContainer(ContainerId containerId);
- public AppFinalStatus getFinalAppStatus();
+ /**
+ * Indicate to the framework that the scheduler has run into an error. This will cause
+ * the DAG and application to be killed.
+ *
+ * @param t the relevant error
+ */
+ void onError(Throwable t);
+
+ /**
+ * Inform the framework that the scheduler has determined that a previously allocated container
+ * needs to be preempted
+ *
+ * @param containerId the containerId to be preempted
+ */
+ void preemptContainer(ContainerId containerId);
+
+ /**
+ * Get the final status for the application, which could be provided to the coordinator of the
+ * source.
+ * Primarily relevant to YARN
+ *
+ * @return the final Application status
+ */
+ AppFinalStatus getFinalAppStatus();
// Getters
- public UserPayload getInitialUserPayload();
+ /**
+ * Get the UserPayload that was configured while setting up the scheduler
+ *
+ * @return the initially configured user payload
+ */
+ UserPayload getInitialUserPayload();
- public String getAppTrackingUrl();
+ /**
+ * Get the tracking URL for the application. Primarily relevant to YARN
+ *
+ * @return the trackingUrl for the app
+ */
+ String getAppTrackingUrl();
+
+ /**
+ * Request the framework for progress of the running DAG. This value must be between 0 and 1
+ *
+ * @return progress
+ */
+ float getProgress();
/**
* A custom cluster identifier allocated to schedulers to generate an AppId, if not making
* use of YARN
- * @return
+ *
+ * @return the custom cluster identifier
*/
- public long getCustomClusterIdentifier();
+ long getCustomClusterIdentifier();
- public ContainerSignatureMatcher getContainerSignatureMatcher();
+ /**
+ * Get an instance of {@link ContainerSignatureMatcher} which can be used to check whether the
+ * specifications of a container match what is required by a task.
+ *
+ * @return an instance of {@link ContainerSignatureMatcher}
+ */
+ ContainerSignatureMatcher getContainerSignatureMatcher();
/**
* Get the application attempt id for the running application. Relevant when running under YARN
- * @return
+ *
+ * @return the applicationAttemptId for the running app
*/
- public ApplicationAttemptId getApplicationAttemptId();
+ ApplicationAttemptId getApplicationAttemptId();
- public String getAppHostName();
+ /**
+ * Get the hostname on which the app is running
+ *
+ * @return the hostname
+ */
+ String getAppHostName();
- public int getAppClientPort();
+ /**
+ * Get the port on which the DAG client is listening
+ *
+ * @return the client port
+ */
+ int getAppClientPort();
- public boolean isSession();
+ /**
+ * Check whether the AM is running in session mode.
+ *
+ * @return true if session mode, false otherwise
+ */
+ boolean isSession();
- public AMState getAMState();
+ /**
+ * Get the state of the AppMaster
+ *
+ * @return the app master state
+ */
+ AMState getAMState();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 794d390..4fc541c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -29,6 +29,19 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
// TODO TEZ-2003 (post) TEZ-2665. Move to the tez-api module
// TODO TEZ-2003 (post) TEZ-2664. Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
+
+/**
+ * This class represents the API for a custom TaskCommunicator which can be run within the Tez AM.
+ * This is used to communicate with running services, potentially launching tasks, and getting
+ * updates from running tasks.
+ * <p/>
+ * The plugin is initialized with an instance of {@link TaskCommunicatorContext} - which provides
+ * a mechanism to notify the system about allocation decisions and resources to the Tez framework.
+ *
+ * If setting up a heartbeat between the task and the AM, the framework is responsible for error checking
+ * of this heartbeat mechanism, handling lost or duplicate responses.
+ *
+ */
public abstract class TaskCommunicator implements ServicePluginLifecycle {
// TODO TEZ-2003 (post) TEZ-2666 Enhancements to interface
@@ -45,34 +58,100 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
this.taskCommunicatorContext = taskCommunicatorContext;
}
+ /**
+ * Get the {@link TaskCommunicatorContext} associated with this instance of the scheduler, which
+ * is
+ * used to communicate with the rest of the system
+ *
+ * @return an instance of {@link TaskCommunicatorContext}
+ */
public TaskCommunicatorContext getContext() {
return taskCommunicatorContext;
}
+ /**
+ * An entry point for initialization.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void initialize() throws Exception {
}
+ /**
+ * An entry point for starting the service.
+ * Order of service setup. Constructor, initialize(), start() - when starting a service.
+ *
+ * @throws Exception
+ */
@Override
public void start() throws Exception {
}
+ /**
+ * Stop the service. This could be invoked at any point, when the service is no longer required -
+ * including in case of errors.
+ *
+ * @throws Exception
+ */
@Override
public void shutdown() throws Exception {
}
+ /**
+ * Register a new container.
+ *
+ * @param containerId the associated containerId
+ * @param hostname the hostname on which the container runs
+ * @param port the port for the service which is running the container
+ */
public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
+ /**
+ * Register the end of a container. This can be caused by preemption, the container completing
+ * successfully, etc.
+ *
+ * @param containerId the associated containerId
+ * @param endReason the end reason for the container completing
+ */
public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+ /**
+ * Register a task attempt to execute on a container
+ *
+ * @param containerId the containerId on which this task needs to run
+ * @param taskSpec the task specifications for the task to be executed
+ * @param additionalResources additional local resources which may be required to run this task
+ * on
+ * the container
+ * @param credentials the credentials required to run this task
+ * @param credentialsChanged whether the credentials are different from the original credentials
+ * associated with this container
+ * @param priority the priority of the task being executed
+ */
public abstract void registerRunningTaskAttempt(ContainerId containerId, TaskSpec taskSpec,
Map<String, LocalResource> additionalResources,
Credentials credentials,
boolean credentialsChanged, int priority);
- public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason);
+ /**
+ * Register the completion of a task. This may be a result of preemption, the container dying,
+ * the
+ * node dying, the task completing to success
+ *
+ * @param taskAttemptID the task attempt which has completed / needs to be completed
+ * @param endReason the endReason for the task attempt.
+ */
+ public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID,
+ TaskAttemptEndReason endReason);
+ /**
+ * Return the address, if any, that the service listens on
+ *
+ * @return the address
+ */
public abstract InetSocketAddress getAddress();
/**
@@ -82,11 +161,13 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
* org.apache.tez.runtime.api.InputInitializerContext#registerForVertexStateUpdates(String,
* java.util.Set)}. Notifications will be received for all registered state changes, and not just
* for the latest state update. They will be in order in which the state change occurred. </p>
- *
+ * <p/>
* Extensive processing should not be performed via this method call. Instead this should just be
* used as a notification mechanism.
- * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator and
+ * <br>This method may be invoked concurrently with other invocations into the TaskCommunicator
+ * and
* multi-threading/concurrency implications must be considered.
+ *
* @param stateUpdate an event indicating the name of the vertex, and it's updated state.
* Additional information may be available for specific events, Look at the
* type hierarchy for {@link org.apache.tez.dag.api.event.VertexStateUpdate}
@@ -97,16 +178,18 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
/**
* Indicates the current running dag is complete. The TaskCommunicatorContext can be used to
* query information about the current dag during the duration of the dagComplete invocation.
- *
+ * <p/>
* After this, the contents returned from querying the context may change at any point - due to
* the next dag being submitted.
*/
public abstract void dagComplete(String dagName);
/**
- * Share meta-information such as host:port information where the Task Communicator may be listening.
+ * Share meta-information such as host:port information where the Task Communicator may be
+ * listening.
* Primarily for use by compatible launchers to learn this information.
- * @return
+ *
+ * @return meta info for the task communicator
*/
public abstract Object getMetaInfo();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
index 8073f6a..0a684e7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java
@@ -42,31 +42,112 @@ public interface TaskCommunicatorContext {
// - Maybe add book-keeping as a helper library, instead of each impl tracking container to task etc.
// - Handling of containres / tasks which no longer exist in the system (formalized interface instead of a shouldDie notification)
+ /**
+ * Get the UserPayload that was configured while setting up the task communicator
+ *
+ * @return the initially configured user payload
+ */
UserPayload getInitialUserPayload();
+ /**
+ * Get the application attempt id for the running application. Relevant when running under YARN
+ *
+ * @return the applicationAttemptId for the running app
+ */
ApplicationAttemptId getApplicationAttemptId();
+
+ /**
+ * Get credentials associated with the AppMaster
+ *
+ * @return credentials
+ */
Credentials getCredentials();
+ /**
+ * Check whether a running attempt can commit. This provides a leader election mechanism amongst
+ * multiple running attempts
+ *
+ * @param taskAttemptId the associated task attempt id
+ * @return whether the attempt can commit or not
+ * @throws IOException
+ */
boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException;
+ /**
+ * Mechanism for a {@link TaskCommunicator} to provide updates on a running task, as well as
+ * receive new information which may need to be propagated to the task. This includes events
+ * generated by the task and events which need to be sent to the task
+ * This method must be invoked periodically to receive updates for a running task
+ *
+ * @param request the update from the running task.
+ * @return the response that is requried by the task.
+ * @throws IOException
+ * @throws TezException
+ */
TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException;
+ /**
+ * Check whether the container is known by the framework. The state of this container is
+ * irrelevant
+ *
+ * @param containerId the relevant container id
+ * @return true if the container is known, false if it isn't
+ */
boolean isKnownContainer(ContainerId containerId);
+ /**
+ * Inform the framework that a task is alive. This needs to be invoked periodically to avoid the
+ * task attempt timing out.
+ * Invocations to heartbeat provides the same keep-alive functionality
+ *
+ * @param taskAttemptId the relevant task attempt
+ */
void taskAlive(TezTaskAttemptID taskAttemptId);
+ /**
+ * Inform the framework that a container is alive. This need to be invoked periodically to avoid
+ * the container attempt timing out.
+ * Invocations to heartbeat provides the same keep-alive functionality
+ *
+ * @param containerId the relevant container id
+ */
void containerAlive(ContainerId containerId);
+ /**
+ * Inform the framework that the task has started execution
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param containerId the containerId in which the task attempt is running
+ */
void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
- void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+ /**
+ * Inform the framework that a task has been killed
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param taskAttemptEndReason the reason for the task attempt being killed
+ * @param diagnostics any diagnostics messages which are relevant to the task attempt
+ * kill
+ */
+ void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics);
- void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, @Nullable String diagnostics);
+ /**
+ * Inform the framework that a task has failed
+ *
+ * @param taskAttemptId the relevant task attempt id
+ * @param taskAttemptEndReason the reason for the task failure
+ * @param diagnostics any diagnostics messages which are relevant to the task attempt
+ * failure
+ */
+ void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
+ @Nullable String diagnostics);
/**
* Register to get notifications on updates to the specified vertex. Notifications will be sent
- * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)} </p>
- *
+ * via {@link org.apache.tez.runtime.api.InputInitializer#onVertexStateUpdated(org.apache.tez.dag.api.event.VertexStateUpdate)}
+ * </p>
+ * <p/>
* This method can only be invoked once. Duplicate invocations will result in an error.
*
* @param vertexName the vertex name for which notifications are required.
@@ -76,6 +157,7 @@ public interface TaskCommunicatorContext {
/**
* Get the name of the currently executing dag
+ *
* @return the name of the currently executing dag
*/
String getCurretnDagName();
@@ -83,6 +165,7 @@ public interface TaskCommunicatorContext {
/**
* Get the name of the Input vertices for the specified vertex.
* Root Inputs are not returned.
+ *
* @param vertexName the vertex for which source vertex names will be returned
* @return an Iterable containing the list of input vertices for the specified vertex
*/
@@ -90,13 +173,15 @@ public interface TaskCommunicatorContext {
/**
* Get the total number of tasks in the given vertex
- * @param vertexName
+ *
+ * @param vertexName the relevant vertex name
* @return total number of tasks in this vertex
*/
int getVertexTotalTaskCount(String vertexName);
/**
* Get the number of completed tasks for a given vertex
+ *
* @param vertexName the vertex name
* @return the number of completed tasks for the vertex
*/
@@ -104,6 +189,7 @@ public interface TaskCommunicatorContext {
/**
* Get the number of running tasks for a given vertex
+ *
* @param vertexName the vertex name
* @return the number of running tasks for the vertex
*/
@@ -111,14 +197,16 @@ public interface TaskCommunicatorContext {
/**
* Get the start time for the first attempt of the specified task
+ *
* @param vertexName the vertex to which the task belongs
- * @param taskIndex the index of the task
+ * @param taskIndex the index of the task
* @return the start time for the first attempt of the task
*/
long getFirstAttemptStartTime(String vertexName, int taskIndex);
/**
* Get the start time for the currently executing DAG
+ *
* @return time when the current dag started executing
*/
long getDagStartTime();
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
index 92bbbdc..3a2efc5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/ContainerLauncherContextImpl.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+@SuppressWarnings("unchecked")
public class ContainerLauncherContextImpl implements ContainerLauncherContext {
private final AppContext context;
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 6c1dad9..ad6f2c4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -178,13 +178,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
taskCommClazz.getConstructor(TaskCommunicatorContext.class);
ctor.setAccessible(true);
return ctor.newInstance(taskCommunicatorContext);
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
}
}
@@ -398,13 +392,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
containerInfo.taskAttemptId);
}
- if (containerInfo.taskAttemptId != null) {
- throw new TezUncheckedException("Registering task attempt: "
- + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
- + " with existing assignment to: " +
- containerInfo.taskAttemptId);
- }
-
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
index 57b4aee..d0cee21 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherRouter.java
@@ -62,7 +62,7 @@ public class ContainerLauncherRouter extends AbstractService
containerLaunchers = new ContainerLauncher[] {containerLauncher};
containerLauncherContexts = new ContainerLauncherContext[] {containerLauncher.getContext()};
containerLauncherServiceWrappers = new ServicePluginLifecycleAbstractService[]{
- new ServicePluginLifecycleAbstractService(containerLauncher)};
+ new ServicePluginLifecycleAbstractService<>(containerLauncher)};
}
// Accepting conf to setup final parameters, if required.
@@ -89,7 +89,7 @@ public class ContainerLauncherRouter extends AbstractService
containerLauncherContexts[i] = containerLauncherContext;
containerLaunchers[i] = createContainerLauncher(containerLauncherDescriptors.get(i), context,
containerLauncherContext, taskAttemptListener, workingDirectory, i, isPureLocalMode);
- containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService(containerLaunchers[i]);
+ containerLauncherServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(containerLaunchers[i]);
}
}
@@ -138,6 +138,7 @@ public class ContainerLauncherRouter extends AbstractService
}
@VisibleForTesting
+ @SuppressWarnings("unchecked")
ContainerLauncher createCustomContainerLauncher(ContainerLauncherContext containerLauncherContext,
NamedEntityDescriptor containerLauncherDescriptor) {
LOG.info("Creating container launcher {}:{} ", containerLauncherDescriptor.getEntityName(),
@@ -150,15 +151,10 @@ public class ContainerLauncherRouter extends AbstractService
.getConstructor(ContainerLauncherContext.class);
ctor.setAccessible(true);
return ctor.newInstance(containerLauncherContext);
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
}
+
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
index 7f1d5a3..2a9797f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerContextImpl.java
@@ -57,6 +57,9 @@ public class TaskSchedulerContextImpl implements TaskSchedulerContext {
}
+ // this may end up being called for a task+container pair that the app
+ // has not heard about. this can happen because of a race between
+ // taskAllocated() upcall and deallocateTask() downcall
@Override
public void taskAllocated(Object task, Object appCookie, Container container) {
tseh.taskAllocated(schedulerId, task, appCookie, container);
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 4d710fa..0f19379 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -144,6 +144,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
* @param webUI
* @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
* An empty list defaults to using the YarnTaskScheduler as the only source.
+ * @param isPureLocalMode whether the AM is running in local mode
*/
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
@@ -423,6 +424,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
return new LocalTaskSchedulerService(taskSchedulerContext);
}
+ @SuppressWarnings("unchecked")
TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
NamedEntityDescriptor taskSchedulerDescriptor,
int schedulerId) {
@@ -436,13 +438,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
.getConstructor(TaskSchedulerContext.class);
ctor.setAccessible(true);
return ctor.newInstance(taskSchedulerContext);
- } catch (NoSuchMethodException e) {
- throw new TezUncheckedException(e);
- } catch (InvocationTargetException e) {
- throw new TezUncheckedException(e);
- } catch (InstantiationException e) {
- throw new TezUncheckedException(e);
- } catch (IllegalAccessException e) {
+ } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
}
}
@@ -453,7 +449,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
int j = 0;
for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
long customAppIdIdentifier;
- if (isPureLocalMode || taskSchedulerDescriptors[i].equals(
+ if (isPureLocalMode || taskSchedulerDescriptors[i].getEntityName().equals(
TezConstants.getTezYarnServicePluginName())) { // Use the app identifier from the appId.
customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
} else {
@@ -463,7 +459,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
customAppIdIdentifier);
taskSchedulers[i] = createTaskScheduler(host, port,
trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i);
- taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService(taskSchedulers[i]);
+ taskSchedulerServiceWrappers[i] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[i]);
}
}
@@ -745,7 +741,7 @@ public class TaskSchedulerEventHandler extends AbstractService implements
public boolean hasUnregistered() {
boolean result = true;
for (int i = 0 ; i < taskSchedulers.length ; i++) {
- result |= this.taskSchedulers[i].hasUnregistered();
+ result = result & this.taskSchedulers[i].hasUnregistered();
if (result == false) {
return result;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/fda06553/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 1a8828d..d8539c5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -122,7 +122,7 @@ public class TezTaskRunner2 {
* the AM - since a task KILL is an external event, and whoever invoked it should
* be able to track it.
*
- * @return
+ * @return the taskRunner result
*/
public TaskRunner2Result run() {
try {