You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/11/20 22:50:21 UTC
tez git commit: TEZ-3998. Allow CONCURRENT edge property in DAG
construction and introduce ConcurrentSchedulingType (Yingda Chen via jeagles)
Repository: tez
Updated Branches:
refs/heads/master efc733183 -> e6722a96b
TEZ-3998. Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType (Yingda Chen via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e6722a96
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e6722a96
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e6722a96
Branch: refs/heads/master
Commit: e6722a96b4827d19850a5a73bc7024941a9deb54
Parents: efc7331
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Nov 20 16:48:59 2018 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Nov 20 16:48:59 2018 -0600
----------------------------------------------------------------------
.../main/java/org/apache/tez/dag/api/DAG.java | 88 ++++---
.../org/apache/tez/dag/api/EdgeProperty.java | 64 +++--
.../apache/tez/dag/api/TezConfiguration.java | 128 +++++-----
.../org/apache/tez/dag/api/TestDAGVerify.java | 92 +++----
.../org/apache/tez/dag/app/DAGAppMaster.java | 70 ++++--
.../library/edgemanager/SilentEdgeManager.java | 89 +++++++
.../VertexManagerWithConcurrentInput.java | 245 +++++++++++++++++++
.../TestVertexManagerWithConcurrentInput.java | 114 +++++++++
8 files changed, 694 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 735c749..f8a2ddc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -79,16 +79,16 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
- * Top level entity that defines the DAG (Directed Acyclic Graph) representing
- * the data flow graph. Consists of a set of Vertices and Edges connecting the
- * vertices. Vertices represent transformations of data and edges represent
+ * Top level entity that defines the DAG (Directed Acyclic Graph) representing
+ * the data flow graph. Consists of a set of Vertices and Edges connecting the
+ * vertices. Vertices represent transformations of data and edges represent
* movement of data between vertices.
*/
@Public
public class DAG {
-
+
private static final Logger LOG = LoggerFactory.getLogger(DAG.class);
-
+
final BidiMap<String, Vertex> vertices =
new DualLinkedHashBidiMap<String, Vertex>();
final Set<Edge> edges = Sets.newHashSet();
@@ -132,7 +132,7 @@ public class DAG {
TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles, "DAG " + getName());
return this;
}
-
+
public synchronized DAG addVertex(Vertex vertex) {
if (vertices.containsKey(vertex.getName())) {
throw new IllegalStateException(
@@ -145,18 +145,18 @@ public class DAG {
public synchronized Vertex getVertex(String vertexName) {
return vertices.get(vertexName);
}
-
+
/**
* One of the methods that can be used to provide information about required
* Credentials when running on a secure cluster. A combination of this and
* addURIsForCredentials should be used to specify information about all
* credentials required by a DAG. AM specific credentials are not used when
* executing a DAG.
- *
+ *
* Set credentials which will be required to run this dag. This method can be
* used if the client has already obtained some or all of the required
* credentials.
- *
+ *
* @param credentials Credentials for the DAG
* @return {@link DAG}
*/
@@ -196,7 +196,7 @@ public class DAG {
}
/**
- * Create a group of vertices that share a common output. This can be used to implement
+ * Create a group of vertices that share a common output. This can be used to implement
* unions efficiently.
* @param name Name of the group.
* @param members {@link Vertex} members of the group
@@ -243,15 +243,15 @@ public class DAG {
* setCredentials should be used to specify information about all credentials
* required by a DAG. AM specific credentials are not used when executing a
* DAG.
- *
+ *
* This method can be used to specify a list of URIs for which Credentials
* need to be obtained so that the job can run. An incremental list of URIs
* can be provided by making multiple calls to the method.
- *
+ *
* Currently, @{link credentials} can only be fetched for HDFS and other
* {@link org.apache.hadoop.fs.FileSystem} implementations that support
* credentials.
- *
+ *
* @param uris
* a list of {@link URI}s
* @return {@link DAG}
@@ -263,7 +263,7 @@ public class DAG {
}
/**
- *
+ *
* @return an unmodifiable list representing the URIs for which credentials
* are required.
*/
@@ -271,7 +271,7 @@ public class DAG {
public synchronized Collection<URI> getURIsForCredentials() {
return Collections.unmodifiableCollection(urisForCredentials);
}
-
+
@Private
public synchronized Set<Vertex> getVertices() {
return Collections.unmodifiableSet(this.vertices.values());
@@ -304,7 +304,7 @@ public class DAG {
edges.add(edge);
return this;
}
-
+
/**
* Add a {@link GroupInputEdge} to the DAG.
* @param edge {@link GroupInputEdge}
@@ -328,7 +328,7 @@ public class DAG {
VertexGroup av = edge.getInputVertexGroup();
av.addOutputVertex(edge.getOutputVertex(), edge);
groupInputEdges.add(edge);
-
+
// add new edge between members of VertexGroup and destVertex of the GroupInputEdge
List<Edge> newEdges = Lists.newLinkedList();
Vertex dstVertex = edge.getOutputVertex();
@@ -337,14 +337,14 @@ public class DAG {
newEdges.add(Edge.create(member, dstVertex, edge.getEdgeProperty()));
}
dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
-
+
for (Edge e : newEdges) {
addEdge(e);
}
-
+
return this;
}
-
+
/**
* Get the DAG name
* @return DAG name
@@ -433,7 +433,7 @@ public class DAG {
newKnownTasksVertices.add(vertex);
}
}
-
+
// walk through all known source 1-1 edges and infer parallelism
// add newly inferred vertices for consideration as known sources
// the outer loop will run for every new level of inferring the parallelism
@@ -456,19 +456,19 @@ public class DAG {
}
}
}
-
+
// check for inconsistency and errors
for (Edge e : edges) {
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
-
+
if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
// both should be equal or equal to -1.
if (outputVertex.getParallelism() != -1) {
throw new TezUncheckedException(
"1-1 Edge. Destination vertex parallelism must match source vertex. "
- + "Vertex: " + inputVertex.getName() + " does not match vertex: "
+ + "Vertex: " + inputVertex.getName() + " does not match vertex: "
+ outputVertex.getName());
}
}
@@ -527,7 +527,7 @@ public class DAG {
}
}
}
-
+
// AnnotatedVertex is used by verify()
private static class AnnotatedVertex {
Vertex v;
@@ -573,7 +573,7 @@ public class DAG {
if (vertices.isEmpty()) {
throw new IllegalStateException("Invalid dag containing 0 vertices");
}
-
+
// check for valid vertices, duplicate vertex names,
// and prepare for cycle detection
Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
@@ -591,14 +591,14 @@ public class DAG {
for (Edge e : edges) {
// Construct structure for cycle detection
Vertex inputVertex = e.getInputVertex();
- Vertex outputVertex = e.getOutputVertex();
+ Vertex outputVertex = e.getOutputVertex();
List<Edge> edgeList = edgeMap.get(inputVertex);
if (edgeList == null) {
edgeList = new ArrayList<Edge>();
edgeMap.put(inputVertex, edgeList);
}
edgeList.add(e);
-
+
// Construct map for Input name verification
Set<String> inboundSet = inboundVertexMap.get(outputVertex);
if (inboundSet == null) {
@@ -606,7 +606,7 @@ public class DAG {
inboundVertexMap.put(outputVertex, inboundSet);
}
inboundSet.add(inputVertex.getName());
-
+
// Construct map for Output name verification
Set<String> outboundSet = outboundVertexMap.get(inputVertex);
if (outboundSet == null) {
@@ -618,7 +618,7 @@ public class DAG {
// check input and output names don't collide with vertex names
for (Vertex vertex : vertices.values()) {
- for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
+ for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : vertex.getInputs()) {
if (vertexMap.containsKey(input.getName())) {
throw new IllegalStateException("Vertex: "
@@ -627,7 +627,7 @@ public class DAG {
+ input.getName());
}
}
- for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
+ for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
output : vertex.getOutputs()) {
if (vertexMap.containsKey(output.getName())) {
throw new IllegalStateException("Vertex: "
@@ -641,7 +641,7 @@ public class DAG {
// Check for valid InputNames
for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
- for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
+ for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : vertex.getInputs()) {
if (entry.getValue().contains(input.getName())) {
throw new IllegalStateException("Vertex: "
@@ -655,7 +655,7 @@ public class DAG {
// Check for valid OutputNames
for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
- for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
+ for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
output : vertex.getOutputs()) {
if (entry.getValue().contains(output.getName())) {
throw new IllegalStateException("Vertex: "
@@ -665,8 +665,8 @@ public class DAG {
}
}
}
-
-
+
+
// Not checking for repeated input names / output names vertex names on the same vertex,
// since we only allow 1 at the moment.
// When additional inputs are supported, this can be chceked easily (and early)
@@ -678,16 +678,12 @@ public class DAG {
if (restricted) {
for (Edge e : edges) {
- if (e.getEdgeProperty().getDataSourceType() !=
- DataSourceType.PERSISTED) {
+ DataSourceType dataSourceType = e.getEdgeProperty().getDataSourceType();
+ if (dataSourceType != DataSourceType.PERSISTED &&
+ dataSourceType != DataSourceType.EPHEMERAL) {
throw new IllegalStateException(
"Unsupported source type on edge. " + e);
}
- if (e.getEdgeProperty().getSchedulingType() !=
- SchedulingType.SEQUENTIAL) {
- throw new IllegalStateException(
- "Unsupported scheduling type on edge. " + e);
- }
}
}
@@ -878,13 +874,13 @@ public class DAG {
groupBuilder.addGroupMembers(v.getName());
}
groupBuilder.addAllOutputs(groupInfo.outputs);
- for (Map.Entry<String, InputDescriptor> entry :
+ for (Map.Entry<String, InputDescriptor> entry :
groupInfo.edgeMergedInputs.entrySet()) {
groupBuilder.addEdgeMergedInputs(
PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
}
- dagBuilder.addVertexGroups(groupBuilder);
+ dagBuilder.addVertexGroups(groupBuilder);
}
}
@@ -956,7 +952,7 @@ public class DAG {
dagCredentials.addAll(dataSink.getCredentials());
}
}
-
+
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
@@ -1045,7 +1041,7 @@ public class DAG {
}
}
}
-
+
if (vertex.getVertexManagerPlugin() != null) {
vertexBuilder.setVertexManagerPlugin(DagTypeConverters
.convertToDAGPlan(vertex.getVertexManagerPlugin()));
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 07fb2c1..c203f8c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -43,7 +43,7 @@ public class EdgeProperty {
*/
public enum DataMovementType {
/**
- * Output on this edge produced by the i-th source task is available to the
+ * Output on this edge produced by the i-th source task is available to the
* i-th destination task.
*/
ONE_TO_ONE,
@@ -58,20 +58,20 @@ public class EdgeProperty {
* are gathered by designated destination tasks.
*/
SCATTER_GATHER,
-
+
/**
* Custom routing defined by the user.
*/
CUSTOM
}
-
+
/**
* Determines the lifetime of the data produced on this edge by a source task.
*/
public enum DataSourceType {
/**
* Data produced by the source is persisted and available even when the
- * task is not running. The data may become unavailable and may cause the
+ * task is not running. The data may become unavailable and may cause the
* source task to be re-executed.
*/
PERSISTED,
@@ -82,31 +82,51 @@ public class EdgeProperty {
PERSISTED_RELIABLE,
/**
* Data produced by the source task is available only while the source task
- * is running. This requires the destination task to run concurrently with
- * the source task. This is not supported yet.
+ * is running. This requires the destination task to run concurrently with
+ * the source task. Development in progress.
*/
@Unstable
EPHEMERAL
}
-
+
/**
- * Determines when the destination task is eligible to run, once the source
+ * Determines when the destination task is eligible to run, once the source
* task is eligible to run.
*/
public enum SchedulingType {
/**
- * Destination task is eligible to run after one or more of its source tasks
+ * Destination task is eligible to run after one or more of its source tasks
* have started or completed.
*/
SEQUENTIAL,
/**
* Destination task must run concurrently with the source task.
- * This is not supported yet.
+ * Development in progress.
*/
@Unstable
CONCURRENT
}
-
+
+ /**
+ * Determines the relevant event(s) that will assist in scheduling downstream vertex
+ * connected via a edge with CONCURRENT {@link SchedulingType}.
+ */
+ public enum ConcurrentEdgeTriggerType {
+ /**
+ * trigger tasks scheduling for downstream vertex(es) upon upstream being configured
+ * this effectively simultaneously schedules downstream and upstream vertices
+ * connected on both ends of a concurrent edge.
+ */
+ SOURCE_VERTEX_CONFIGURED,
+
+ /**
+ * trigger tasks scheduling for downstream vertex(es) by "running" event(s) of upstream tasks
+ * this will be fully supported with TEZ-3999
+ */
+ SOURCE_TASK_STARTED
+ }
+
+
final DataMovementType dataMovementType;
final DataSourceType dataSourceType;
final SchedulingType schedulingType;
@@ -172,7 +192,7 @@ public class EdgeProperty {
Preconditions.checkArgument(dataMovementType != DataMovementType.CUSTOM,
DataMovementType.CUSTOM + " cannot be used with this constructor");
}
-
+
private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
DataSourceType dataSourceType,
@@ -182,7 +202,7 @@ public class EdgeProperty {
this(edgeManagerDescriptor, DataMovementType.CUSTOM, dataSourceType, schedulingType,
edgeSource, edgeDestination);
}
-
+
private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
DataMovementType dataMovementType, DataSourceType dataSourceType,
SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) {
@@ -193,7 +213,7 @@ public class EdgeProperty {
this.inputDescriptor = edgeDestination;
this.outputDescriptor = edgeSource;
}
-
+
/**
* Get the {@link DataMovementType}
* @return {@link DataMovementType}
@@ -201,7 +221,7 @@ public class EdgeProperty {
public DataMovementType getDataMovementType() {
return dataMovementType;
}
-
+
/**
* Get the {@link DataSourceType}
* @return {@link DataSourceType}
@@ -209,7 +229,7 @@ public class EdgeProperty {
public DataSourceType getDataSourceType() {
return dataSourceType;
}
-
+
/**
* Get the {@link SchedulingType}
* @return {@link SchedulingType}
@@ -217,30 +237,30 @@ public class EdgeProperty {
public SchedulingType getSchedulingType() {
return schedulingType;
}
-
+
/**
* @return the {@link InputDescriptor} which will consume data from the edge.
*/
public InputDescriptor getEdgeDestination() {
return inputDescriptor;
}
-
+
/**
* @return the {@link OutputDescriptor} which produces data on the edge.
*/
public OutputDescriptor getEdgeSource() {
return outputDescriptor;
}
-
+
/**
- * Returns the Edge Manager specifications for this edge.
+ * Returns the Edge Manager specifications for this edge.
* @return @link {@link EdgeManagerPluginDescriptor} if a custom edge was setup, null otherwise.
*/
@Private
public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() {
return edgeManagerDescriptor;
}
-
+
@Override
public String toString() {
return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
@@ -248,5 +268,5 @@ public class EdgeProperty {
+ " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName())
+ " }";
}
-
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 43014a4..791e634 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.tez.common.annotation.ConfigurationClass;
import org.apache.tez.common.annotation.ConfigurationProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -41,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
/**
- * Defines the configurations for Tez. These configurations are typically specified in
+ * Defines the configurations for Tez. These configurations are typically specified in
* tez-site.xml on the client machine where TezClient is used to launch the Tez application.
* tez-site.xml is expected to be picked up from the classpath of the client process.
* @see <a href="../../../../../configs/TezConfiguration.html">Detailed Configuration Information</a>
@@ -131,7 +132,7 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
/**
- * Boolean value. If true then Tez will try to automatically delete temporary job
+ * Boolean value. If true then Tez will try to automatically delete temporary job
* artifacts that it creates within the specified staging dir. Does not affect any user data.
*/
@ConfigurationScope(Scope.AM)
@@ -183,7 +184,7 @@ public class TezConfiguration extends Configuration {
+ "use.concurrent-dispatcher";
@Private
public static boolean TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT = false;
-
+
@Private
@ConfigurationScope(Scope.AM)
public static final String TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY = TEZ_AM_PREFIX
@@ -196,7 +197,7 @@ public class TezConfiguration extends Configuration {
* code is written according to best practices then the same code can execute in either mode based
* on this configuration. Session mode is more aggressive in reserving execution resources and is
* typically used for interactive applications where multiple DAGs are submitted in quick succession
- * by the same user. For long running applications, one-off executions, batch jobs etc non-session
+ * by the same user. For long running applications, one-off executions, batch jobs etc non-session
* mode is recommended. If session mode is enabled then container reuse is recommended.
*/
@ConfigurationScope(Scope.AM)
@@ -271,12 +272,12 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
/**
- * int value. Represents the maximum time in seconds for which a consumer attempt can report
- * a read error against its producer attempt, after which the producer attempt will be re-run
- * to re-generate the output. There are other heuristics which determine the retry and mainly
- * try to guard against a flurry of re-runs due to intermittent read errors
+ * int value. Represents the maximum time in seconds for which a consumer attempt can report
+ * a read error against its producer attempt, after which the producer attempt will be re-run
+ * to re-generate the output. There are other heuristics which determine the retry and mainly
+ * try to guard against a flurry of re-runs due to intermittent read errors
* (due to network issues). This configuration puts a time limit on those heuristics to ensure
- * jobs dont hang indefinitely due to lack of closure in those heuristics
+ * jobs dont hang indefinitely due to lack of closure in those heuristics
*
* Expert level setting.
*/
@@ -288,9 +289,9 @@ public class TezConfiguration extends Configuration {
/**
* Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
- * output specific operation and typically involves making the output visible for consumption.
- * If the config is true, then the outputs are committed at the end of DAG completion after all
- * constituent vertices have completed. If false, outputs for each vertex are committed after that
+ * output specific operation and typically involves making the output visible for consumption.
+ * If the config is true, then the outputs are committed at the end of DAG completion after all
+ * constituent vertices have completed. If false, outputs for each vertex are committed after that
* vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies
* this value must be appropriately chosen. Defaults to the safe choice of true.
*/
@@ -330,7 +331,7 @@ public class TezConfiguration extends Configuration {
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_LAUNCH_CMD_OPTS = TEZ_AM_PREFIX + "launch.cmd-opts";
- public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT =
+ public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT =
"-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC";
/**
@@ -409,6 +410,19 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = "";
/**
+ * String value. In the presence of concurrent input edge to a vertex, this describes
+ * the timing of scheduling downstream vertex tasks. It may be closely related to the
+ * type of event that will contribute to a scheduling decision.
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty
+ public static final String TEZ_CONCURRENT_EDGE_TRIGGER_TYPE =
+ TEZ_TASK_PREFIX + "concurrent.edge.trigger.type";
+ public static final String TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT =
+ ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.name();
+
+
+ /**
* String value. Env settings will be merged with {@link #TEZ_TASK_LAUNCH_ENV}
* during the launch of the task process. This property will typically be configured to
* include default system env meant to be used by all jobs in a cluster. If required, the values can
@@ -508,16 +522,16 @@ public class TezConfiguration extends Configuration {
@Unstable
/**
- * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency
+ * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency
* when some tasks are running slower due bad/slow machines
*/
@ConfigurationScope(Scope.VERTEX) // TODO Verify the vertex speculation, TEZ-1788
@ConfigurationProperty(type="boolean")
public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
-
+
/**
- * Float value. Specifies how many standard deviations away from the mean task execution time
+ * Float value. Specifies how many standard deviations away from the mean task execution time
* should be considered as an outlier/slow task.
*/
@Unstable
@@ -539,14 +553,14 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Upper limit on the number of threads user to launch containers in the app
- * master. Expert level setting.
+ * master. Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
TEZ_AM_PREFIX + "containerlauncher.thread-count-limit";
- public static final int TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT =
+ public static final int TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT =
500;
@@ -560,8 +574,8 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 10;
/**
- * Int value. Specifies the number of times the app master can be launched in order to recover
- * from app master failure. Typically app master failures are non-recoverable. This parameter
+ * Int value. Specifies the number of times the app master can be launched in order to recover
+ * from app master failure. Typically app master failures are non-recoverable. This parameter
* is for cases where the app master is not at fault but is lost due to system errors.
* Expert level setting.
*/
@@ -582,7 +596,7 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1;
/**
- * Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
+ * Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
* This does not count killed attempts. Task failure results in DAG failure.
*/
@ConfigurationScope(Scope.VERTEX)
@@ -612,7 +626,7 @@ public class TezConfiguration extends Configuration {
public static final boolean TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT=true;
/**
- * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes
+ * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes
* will not be used to execute tasks.
*/
@ConfigurationScope(Scope.AM)
@@ -620,11 +634,11 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
+ "node-blacklisting.enabled";
public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
-
+
/**
* Int value. Specifies the percentage of nodes in the cluster that may be considered faulty.
- * This limits the number of nodes that are blacklisted in an effort to minimize the effects of
- * temporary surges in failures (e.g. due to network outages).
+ * This limits the number of nodes that are blacklisted in an effort to minimize the effects of
+ * temporary surges in failures (e.g. due to network outages).
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
@@ -651,7 +665,7 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_CLIENT_THREAD_COUNT =
TEZ_AM_PREFIX + "client.am.thread-count";
public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 2;
-
+
/**
* String value. Range of ports that the AM can use when binding for client connections. Leave blank
* to use all possible ports. Expert level setting. It's hadoop standard range configuration.
@@ -721,7 +735,7 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10;
/** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
- * all vertices. Setting it to the same value for all tasks is helpful for container reuse and
+ * all vertices. Setting it to the same value for all tasks is helpful for container reuse and
* thus good for performance typically. */
@ConfigurationScope(Scope.DAG) // TODO vertex level
@ConfigurationProperty(type="integer")
@@ -736,7 +750,7 @@ public class TezConfiguration extends Configuration {
@ConfigurationProperty(type="integer")
public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
+ "resource.cpu.vcores";
- public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1;
+ public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1;
/**
* Int value. The maximum heartbeat interval between the AM and RM in milliseconds
@@ -751,7 +765,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. The maximum amount of time, in milliseconds, to wait before a task asks an
- * AM for another task. Increasing this can help improve app master scalability for a large
+ * AM for another task. Increasing this can help improve app master scalability for a large
* number of concurrent tasks. Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@@ -761,7 +775,7 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 200;
/**
- * Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks.
+ * Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks.
* Increasing this can help improve app master scalability for a large number of concurrent tasks.
* Expert level setting.
*/
@@ -772,8 +786,8 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
/**
- * Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from
- * tasks. This reduces the amount of network traffice between AM and tasks to send high-volume
+ * Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from
+ * tasks. This reduces the amount of network traffice between AM and tasks to send high-volume
* counters. Improves AM scalability. Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@@ -792,7 +806,7 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+ "max-events-per-heartbeat";
public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
-
+
/**
* Int value. Maximum number of pending task events before a task will stop
* asking for more events in the task heartbeat.
@@ -827,16 +841,16 @@ public class TezConfiguration extends Configuration {
public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false;
/**
- * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
- * components need to make successive progress notifications. If the progress is not notified
+ * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
+ * components need to make successive progress notifications. If the progress is not notified
* for this interval then the task will be considered hung and terminated.
- * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS}
+ * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS}
* and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}.
* A config value <=0 disables this.
*/
@ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
- public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX +
+ public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX +
"progress.stuck.interval-ms";
public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;
@@ -1010,7 +1024,7 @@ public class TezConfiguration extends Configuration {
/**
* Boolean value. Whether to reuse containers for non-local tasks. Active only if reuse is
- * enabled. Turning this on can severely affect locality and can be bad for jobs with high data
+ * enabled. Turning this on can severely affect locality and can be bad for jobs with high data
* volume being read from the primary data sources.
*/
@ConfigurationScope(Scope.AM)
@@ -1047,15 +1061,15 @@ public class TezConfiguration extends Configuration {
TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT = 250l;
/**
- * Int value. The minimum amount of time to hold on to a container that is idle. Only active when
- * reuse is enabled. Set to -1 to never release idle containers (not recommended).
+ * Int value. The minimum amount of time to hold on to a container that is idle. Only active when
+ * reuse is enabled. Set to -1 to never release idle containers (not recommended).
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS =
TEZ_AM_PREFIX + "container.idle.release-timeout-min.millis";
public static final long
- TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT = 5000l;
+ TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT = 5000l;
/**
* Int value. The maximum amount of time to hold on to a container if no task can be
@@ -1064,7 +1078,7 @@ public class TezConfiguration extends Configuration {
* TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS.
* Containers will have an expire time set to a random value between
* TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS &&
- * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This
+ * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This
* creates a graceful reduction in the amount of idle resources held
*/
@ConfigurationScope(Scope.AM)
@@ -1073,9 +1087,9 @@ public class TezConfiguration extends Configuration {
TEZ_AM_PREFIX + "container.idle.release-timeout-max.millis";
public static final long
TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT = 10000l;
-
+
/**
- * Int value. The minimum number of containers that will be held in session mode. Not active in
+ * Int value. The minimum number of containers that will be held in session mode. Not active in
* non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number
* of containers to provide fast response times for the next DAG.
*/
@@ -1086,7 +1100,7 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
/**
- * Boolean value. Allow/disable logging for all dags in a session
+ * Boolean value. Allow/disable logging for all dags in a session
*/
@Private
@ConfigurationScope(Scope.AM)
@@ -1119,7 +1133,7 @@ public class TezConfiguration extends Configuration {
public static final float TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT = 0.0f;
/**
* Int value. The number of RM heartbeats to wait after preempting running tasks before preempting
- * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the
+ * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the
* RM can act on the released resources and assign new ones to us. Expert level setting.
*/
@ConfigurationScope(Scope.AM)
@@ -1130,8 +1144,8 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Time (in millisecs) that an unsatisfied request will wait before preempting other
- * resources. In rare cases, the cluster says there are enough free resources but does not end
- * up getting enough on a node to actually assign it to the job. This configuration tries to put
+ * resources. In rare cases, the cluster says there are enough free resources but does not end
+ * up getting enough on a node to actually assign it to the job. This configuration tries to put
* a deadline on such wait to prevent indefinite job hangs.
*/
@ConfigurationScope(Scope.AM)
@@ -1169,7 +1183,7 @@ public class TezConfiguration extends Configuration {
*
* Specify additional user classpath information to be used for Tez AM and all containers.
* This will be appended to the classpath after PWD
- *
+ *
* 'tez.lib.uris.classpath' defines the relative classpath into the archives
* that are set in 'tez.lib.uris'
*
@@ -1195,7 +1209,7 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AUX_URIS = TEZ_PREFIX + "aux.uris";
/**
- * Boolean value. Allows to ignore 'tez.lib.uris'. Useful during development as well as
+ * Boolean value. Allows to ignore 'tez.lib.uris'. Useful during development as well as
* raw Tez application where classpath is propagated with application
* via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios.
*/
@@ -1261,8 +1275,8 @@ public class TezConfiguration extends Configuration {
/**
* Int value. Time (in seconds) to wait for AM to come up when trying to submit a DAG
- * from the client. Only relevant in session mode. If the cluster is busy and cannot launch the
- * AM then this timeout may be hit. In those case, using non-session mode is recommended if
+ * from the client. Only relevant in session mode. If the cluster is busy and cannot launch the
+ * AM then this timeout may be hit. In those case, using non-session mode is recommended if
* applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended)
*/
@ConfigurationScope(Scope.AM)
@@ -1433,7 +1447,7 @@ public class TezConfiguration extends Configuration {
public static final int TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT = 1;
/**
- * String value. The directory into which history data will be written. This defaults to the
+ * String value. The directory into which history data will be written. This defaults to the
* container logging directory. This is relevant only when SimpleHistoryLoggingService is being
* used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
*/
@@ -1542,7 +1556,7 @@ public class TezConfiguration extends Configuration {
+ "yarn.ats.acl.dag.domain.id";
/**
- * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
+ * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
* incomplete DAGs from the previous instance of the app master.
*/
@ConfigurationScope(Scope.AM)
@@ -1641,10 +1655,10 @@ public class TezConfiguration extends Configuration {
public static final boolean TEZ_AM_ACLS_ENABLED_DEFAULT = true;
/**
- * String value.
+ * String value.
* AM view ACLs. This allows the specified users/groups to view the status of the AM and all DAGs
* that run within this AM.
- * Comma separated list of users, followed by whitespace, followed by a comma separated list of
+ * Comma separated list of users, followed by whitespace, followed by a comma separated list of
* groups
*/
@ConfigurationScope(Scope.AM)
@@ -1655,7 +1669,7 @@ public class TezConfiguration extends Configuration {
* String value.
* AM modify ACLs. This allows the specified users/groups to run modify operations on the AM
* such as submitting DAGs, pre-warming the session, killing DAGs or shutting down the session.
- * Comma separated list of users, followed by whitespace, followed by a comma separated list of
+ * Comma separated list of users, followed by whitespace, followed by a comma separated list of
* groups
*/
@ConfigurationScope(Scope.AM)
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index e3c40aa..bde4622 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -120,7 +120,7 @@ public class TestDAGVerify {
dag.addEdge(e1);
dag.verify();
}
-
+
@Test(timeout = 5000)
// v1 (known) -> v2 (-1) -> v3 (-1)
public void testVerifyOneToOneInferParallelism() {
@@ -153,7 +153,7 @@ public class TestDAGVerify {
Assert.assertEquals(dummyTaskCount, v2.getParallelism());
Assert.assertEquals(dummyTaskCount, v3.getParallelism());
}
-
+
@Test(timeout = 5000)
// v1 (known) -> v2 (-1) -> v3 (-1)
// The test checks resiliency to ordering of the vertices/edges
@@ -187,7 +187,7 @@ public class TestDAGVerify {
Assert.assertEquals(dummyTaskCount, v2.getParallelism());
Assert.assertEquals(dummyTaskCount, v3.getParallelism());
}
-
+
@Test(timeout = 5000)
public void testVerifyOneToOneNoInferParallelism() {
Vertex v1 = Vertex.create("v1",
@@ -211,7 +211,7 @@ public class TestDAGVerify {
dag.verify();
Assert.assertEquals(-1, v2.getParallelism());
}
-
+
@Test(timeout = 5000)
// v1 (-1) -> v2 (known) -> v3 (-1)
public void testVerifyOneToOneIncorrectParallelism1() {
@@ -296,7 +296,7 @@ public class TestDAGVerify {
"1-1 Edge. Destination vertex parallelism must match source vertex"));
}
}
-
+
@Test(timeout = 5000)
public void testVerifyBroadcast() {
Vertex v1 = Vertex.create("v1",
@@ -317,7 +317,7 @@ public class TestDAGVerify {
dag.verify();
}
- @Test(expected = IllegalStateException.class, timeout = 5000)
+ @Test(timeout = 5000)
public void testVerify3() {
Vertex v1 = Vertex.create("v1",
ProcessorDescriptor.create(dummyProcessorClassName),
@@ -337,7 +337,7 @@ public class TestDAGVerify {
dag.verify();
}
- @Test(expected = IllegalStateException.class, timeout = 5000)
+ @Test(timeout = 5000)
public void testVerify4() {
Vertex v1 = Vertex.create("v1",
ProcessorDescriptor.create(dummyProcessorClassName),
@@ -525,7 +525,7 @@ public class TestDAGVerify {
System.out.println(ex.getMessage());
Assert.assertTrue(ex.getMessage().startsWith("Vertex v1 already defined"));
}
-
+
@Test(expected = IllegalStateException.class, timeout = 5000)
public void testInputAndInputVertexNameCollision() {
Vertex v1 = Vertex.create("v1",
@@ -534,22 +534,22 @@ public class TestDAGVerify {
Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
-
+
v2.addDataSource("v1", DataSourceDescriptor.create(null, null, null));
-
+
Edge e1 = Edge.create(v1, v2,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")));
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addVertex(v2);
dag.addEdge(e1);
dag.verify();
}
-
+
@Test(expected = IllegalStateException.class, timeout = 5000)
public void testOutputAndOutputVertexNameCollision() {
Vertex v1 = Vertex.create("v1",
@@ -558,22 +558,22 @@ public class TestDAGVerify {
Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
-
+
v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
-
+
Edge e1 = Edge.create(v1, v2,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")));
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addVertex(v2);
dag.addEdge(e1);
dag.verify();
}
-
+
@Test(expected = IllegalStateException.class, timeout = 5000)
public void testOutputAndVertexNameCollision() {
Vertex v1 = Vertex.create("v1",
@@ -582,15 +582,15 @@ public class TestDAGVerify {
Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
-
+
v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addVertex(v2);
dag.verify();
}
-
+
@Test(expected = IllegalStateException.class, timeout = 5000)
public void testInputAndVertexNameCollision() {
Vertex v1 = Vertex.create("v1",
@@ -599,9 +599,9 @@ public class TestDAGVerify {
Vertex v2 = Vertex.create("v2",
ProcessorDescriptor.create("MapProcessor"),
dummyTaskCount, dummyTaskResource);
-
+
v1.addDataSource("v2", DataSourceDescriptor.create(null, null, null));
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addVertex(v2);
@@ -640,7 +640,7 @@ public class TestDAGVerify {
dag.addEdge(e2);
dag.verify();
}
-
+
@Test(timeout = 5000)
public void testVertexGroupWithMultipleOutputEdges() {
Vertex v1 = Vertex.create("v1",
@@ -655,19 +655,19 @@ public class TestDAGVerify {
Vertex v4 = Vertex.create("v4",
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
-
+
DAG dag = DAG.create("testDag");
VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-
+
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")),
InputDescriptor.create("dummy input class"));
-
+
GroupInputEdge e2 = GroupInputEdge.create(uv12, v4,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -685,7 +685,7 @@ public class TestDAGVerify {
for (int i = 0; i< 10;++i){
dag.verify(); // should be OK when called multiple times
}
-
+
Assert.assertEquals(2, v1.getOutputVertices().size());
Assert.assertEquals(2, v2.getOutputVertices().size());
Assert.assertTrue(v1.getOutputVertices().contains(v3));
@@ -693,7 +693,7 @@ public class TestDAGVerify {
Assert.assertTrue(v2.getOutputVertices().contains(v3));
Assert.assertTrue(v2.getOutputVertices().contains(v4));
}
-
+
@Test(timeout = 5000)
public void testVertexGroup() {
Vertex v1 = Vertex.create("v1",
@@ -711,16 +711,16 @@ public class TestDAGVerify {
Vertex v5 = Vertex.create("v5",
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
-
+
DAG dag = DAG.create("testDag");
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-
+
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
-
+
GroupInputEdge e1 = GroupInputEdge.create(uv12, v4,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -733,7 +733,7 @@ public class TestDAGVerify {
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")),
InputDescriptor.create("dummy input class"));
-
+
dag.addVertex(v1);
dag.addVertex(v2);
dag.addVertex(v3);
@@ -744,7 +744,7 @@ public class TestDAGVerify {
for (int i = 0; i< 10;++i){
dag.verify(); // should be OK when called multiple times
}
-
+
// for the first Group v1 and v2 should get connected to v4 and also have 1 output
// for the second Group v2 and v3 should get connected to v5
// the Group place holders should disappear
@@ -775,7 +775,7 @@ public class TestDAGVerify {
Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2));
Assert.assertEquals(2, dag.vertexGroups.size());
}
-
+
@Test(timeout = 5000)
public void testVertexGroupOneToOne() {
Vertex v1 = Vertex.create("v1",
@@ -793,16 +793,16 @@ public class TestDAGVerify {
Vertex v5 = Vertex.create("v5",
ProcessorDescriptor.create("Processor"),
-1, dummyTaskResource);
-
+
DAG dag = DAG.create("testDag");
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-
+
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
-
+
GroupInputEdge e1 = GroupInputEdge.create(uv12, v4,
EdgeProperty.create(DataMovementType.ONE_TO_ONE,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -815,7 +815,7 @@ public class TestDAGVerify {
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")),
InputDescriptor.create("dummy input class"));
-
+
dag.addVertex(v1);
dag.addVertex(v2);
dag.addVertex(v3);
@@ -826,7 +826,7 @@ public class TestDAGVerify {
for (int i = 0; i< 10;++i){
dag.verify(); // should be OK when called multiple times
}
-
+
Assert.assertEquals(dummyTaskCount, v5.getParallelism());
}
@@ -941,8 +941,8 @@ public class TestDAGVerify {
dag.createDag(new TezConfiguration(), null, null, null, true);
}
-
-
+
+
@Test(timeout = 5000)
public void testDAGCreateDataInference() {
Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
@@ -954,7 +954,7 @@ public class TestDAGVerify {
String lrName2 = "LR2";
lrs2.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
-
+
Set<String> hosts = Sets.newHashSet();
hosts.add("h1");
hosts.add("h2");
@@ -962,10 +962,10 @@ public class TestDAGVerify {
taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
- DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
+ DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
InputInitializerDescriptor.create(dummyInputInitClassName), dummyTaskCount, null, vLoc, lrs2);
v1.addDataSource("i1", ds);
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addTaskLocalFiles(lrs1);
@@ -1003,10 +1003,10 @@ public class TestDAGVerify {
Assert.assertTrue(e.getMessage().contains("Duplicate Resources found with different size"));
}
- DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
+ DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
null, -1, null, null, lrs2);
v1.addDataSource("i1", ds);
-
+
DAG dag = DAG.create("testDag");
dag.addVertex(v1);
dag.addTaskLocalFiles(lrs);
@@ -1024,7 +1024,7 @@ public class TestDAGVerify {
Assert.assertTrue(e.getMessage().contains("Duplicate Resources found with different size"));
}
}
-
+
@Test(timeout = 5000)
public void testDAGAccessControls() {
DAG dag = DAG.create("testDag");
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 177ba56..2d2f23d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -298,7 +299,7 @@ public class DAGAppMaster extends AbstractService {
private Path currentRecoveryDataDir;
private Path tezSystemStagingDir;
private FileSystem recoveryFS;
-
+
private ExecutorService rawExecutor;
private ListeningExecutorService execService;
@@ -330,7 +331,7 @@ public class DAGAppMaster extends AbstractService {
private String clientVersion;
private boolean versionMismatch = false;
private String versionMismatchDiagnostics;
-
+
private ResourceCalculatorProcessTree cpuPlugin;
private GcTimeUpdater gcPlugin;
@@ -385,7 +386,7 @@ public class DAGAppMaster extends AbstractService {
return PATH_JOINER.join(nodeHttpAddress, "node", "containerlogs",
containerId, user);
}
-
+
private void initResourceCalculatorPlugins() {
Class<? extends ResourceCalculatorProcessTree> clazz = amConf.getClass(
TezConfiguration.TEZ_TASK_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS,
@@ -400,10 +401,10 @@ public class DAGAppMaster extends AbstractService {
pid = processName.split("@")[0];
}
cpuPlugin = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, amConf);
-
+
gcPlugin = new GcTimeUpdater(null);
}
-
+
private long getAMCPUTime() {
if (cpuPlugin != null) {
cpuPlugin.updateProcessTree();
@@ -557,14 +558,14 @@ public class DAGAppMaster extends AbstractService {
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
} else {
- int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
+ int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher(), sharedDispatcher);
}
-
+
// register other delegating dispatchers
dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
"Speculator");
@@ -661,7 +662,7 @@ public class DAGAppMaster extends AbstractService {
protected ContainerSignatureMatcher createContainerSignatureMatcher() {
return new ContainerContextMatcher();
}
-
+
@VisibleForTesting
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher("Central");
@@ -680,7 +681,7 @@ public class DAGAppMaster extends AbstractService {
System.exit(0);
}
}
-
+
@VisibleForTesting
protected TaskSchedulerManager getTaskSchedulerManager() {
return taskSchedulerManager;
@@ -1342,7 +1343,7 @@ public class DAGAppMaster extends AbstractService {
}
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message));
}
-
+
private Map<String, LocalResource> getAdditionalLocalResourceDiff(
DAG dag, Map<String, LocalResource> additionalResources) throws TezException {
if (additionalResources == null) {
@@ -1494,7 +1495,7 @@ public class DAGAppMaster extends AbstractService {
public DAG getCurrentDAG() {
return dag;
}
-
+
@Override
public ListeningExecutorService getExecService() {
return execService;
@@ -1695,7 +1696,7 @@ public class DAGAppMaster extends AbstractService {
public long getCumulativeCPUTime() {
return getAMCPUTime();
}
-
+
@Override
public long getCumulativeGCTime() {
return getAMGCTime();
@@ -1928,7 +1929,7 @@ public class DAGAppMaster extends AbstractService {
}
return null;
}
-
+
@Override
public synchronized void serviceStart() throws Exception {
@@ -1971,8 +1972,19 @@ public class DAGAppMaster extends AbstractService {
return;
}
+ DAGPlan dagPlan = null;
if (!isSession) {
LOG.info("In Non-Session mode.");
+ dagPlan = readDAGPlanFile();
+ if (hasConcurrentEdge(dagPlan)) {
+ // Currently a DAG with concurrent edge is deemed unrecoverable
+ // (run from scratch) on AM failover. Proper AM failover for DAG with
+ // concurrent edge is pending TEZ-4017
+ if (recoveredDAGData != null) {
+ LOG.warn("Ignoring recoveredDAGData for a recovered DAG with concurrent edge.");
+ recoveredDAGData = null;
+ }
+ }
} else {
LOG.info("In Session mode. Waiting for DAG over RPC");
this.state = DAGAppMasterState.IDLE;
@@ -2053,7 +2065,8 @@ public class DAGAppMaster extends AbstractService {
if (!isSession) {
// No dag recovered - in non-session, just restart the original DAG
dagCounter.set(0);
- startDAG();
+ assert(dagPlan != null);
+ startDAG(dagPlan, null);
}
}
@@ -2181,7 +2194,7 @@ public class DAGAppMaster extends AbstractService {
@Override
public void handle(TaskEvent event) {
DAG dag = context.getCurrentDAG();
- int eventDagIndex =
+ int eventDagIndex =
event.getTaskID().getVertexID().getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
@@ -2192,7 +2205,7 @@ public class DAGAppMaster extends AbstractService {
((EventHandler<TaskEvent>)task).handle(event);
}
}
-
+
private class SpeculatorEventHandler implements EventHandler<SpeculatorEvent> {
@Override
public void handle(SpeculatorEvent event) {
@@ -2211,7 +2224,7 @@ public class DAGAppMaster extends AbstractService {
@Override
public void handle(TaskAttemptEvent event) {
DAG dag = context.getCurrentDAG();
- int eventDagIndex =
+ int eventDagIndex =
event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
@@ -2230,12 +2243,12 @@ public class DAGAppMaster extends AbstractService {
@Override
public void handle(VertexEvent event) {
DAG dag = context.getCurrentDAG();
- int eventDagIndex =
+ int eventDagIndex =
event.getVertexId().getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
-
+
Vertex vertex =
dag.getVertex(event.getVertexId());
((EventHandler<VertexEvent>) vertex).handle(event);
@@ -2440,23 +2453,30 @@ public class DAGAppMaster extends AbstractService {
}
}
- private void startDAG() throws IOException, TezException {
+ private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+ boolean hasConcurrentEdge = false;
+ for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+ if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+ return true;
+ }
+ }
+ return hasConcurrentEdge;
+ }
+
+ private DAGPlan readDAGPlanFile() throws IOException, TezException {
FileInputStream dagPBBinaryStream = null;
+ DAGPlan dagPlan = null;
try {
- DAGPlan dagPlan = null;
-
// Read the protobuf DAG
dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
TezConstants.TEZ_PB_PLAN_BINARY_NAME));
dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
- startDAG(dagPlan, null);
-
} finally {
if (dagPBBinaryStream != null) {
dagPBBinaryStream.close();
}
}
+ return dagPlan;
}
private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMResources)
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
new file mode 100644
index 0000000..db6bb5a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.edgemanager;
+
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A dummy edge manager used in scenarios where application will depend on
+ * the direct connection between containers/tasks to handle all data communications,
+ * including both routing and actual data transfers.
+ */
+
+public class SilentEdgeManager extends EdgeManagerPlugin {
+
+ /**
+ * Create an instance of the EdgeManagerPlugin. Classes extending this to
+ * create a EdgeManagerPlugin, must provide the same constructor so that Tez
+ * can create an instance of the class at runtime.
+ *
+ * @param context the context within which this EdgeManagerPlugin will run. Includes
+ * information like configuration which the user may have specified
+ * while setting up the edge.
+ */
+ public SilentEdgeManager(EdgeManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() throws Exception {
+
+ }
+
+ @Override
+ public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public void routeDataMovementEventToDestination(
+ DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex,
+ Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
+ throw new UnsupportedOperationException(
+ "routeDataMovementEventToDestination not supported for SilentEdgeManager");
+ }
+
+ @Override
+ public void routeInputSourceTaskFailedEventToDestination(
+ int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
+ throw new UnsupportedOperationException(
+ "routeInputSourceTaskFailedEventToDestination not supported for SilentEdgeManager");
+ }
+
+ @Override
+ public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
+ return 0;
+ }
+
+ @Override
+ public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
new file mode 100644
index 0000000..caf5acd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+ private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+ private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
+ private int managedTasks;
+ private AtomicBoolean tasksScheduled = new AtomicBoolean(false);
+ private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+ private Configuration vertexConfig;
+ private String vertexName;
+ private ConcurrentEdgeTriggerType edgeTriggerType;
+ private volatile boolean allSrcVerticesConfigured;
+
+ int completedUpstreamTasks;
+
+ public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
+ super(context);
+ }
+
+ @Override
+ public void initialize() {
+ UserPayload userPayload = getContext().getUserPayload();
+ if (userPayload == null || userPayload.getPayload() == null ||
+ userPayload.getPayload().limit() == 0) {
+ throw new TezUncheckedException("Could not initialize VertexManagerWithConcurrentInput"
+ + " from provided user payload");
+ }
+ managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+ Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+ for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
+ if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+ throw new TezUncheckedException("All input edges to vertex " + vertexName +
+ " must be CONCURRENT.");
+ }
+ String srcVertex = entry.getKey();
+ srcVerticesConfigured.put(srcVertex, false);
+ getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
+ }
+
+ try {
+ vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+ vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+ TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+ if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType)) {
+ // pending TEZ-3999
+ throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
+ }
+ LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", edgeTriggerType);
+
+ vertexName = getContext().getVertexName();
+ completedUpstreamTasks = 0;
+ }
+
+ @Override
+ public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
+ onVertexStartedDone.set(true);
+ scheduleTasks();
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ VertexState state = stateUpdate.getVertexState();
+ String fromVertex = stateUpdate.getVertexName();
+ if (!srcVerticesConfigured.containsKey(fromVertex)) {
+ throw new IllegalArgumentException("Not expecting state update from vertex:" +
+ fromVertex + " in vertex: " + this.vertexName);
+ }
+
+ if (!VertexState.CONFIGURED.equals(state)) {
+ throw new IllegalArgumentException("Received incorrect state notification : " +
+ state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName);
+ }
+
+ LOG.info("Received configured notification: " + state + " for vertex: "
+ + fromVertex + " in vertex: " + this.vertexName);
+ srcVerticesConfigured.put(fromVertex, true);
+
+ // check for source vertices completely configured
+ boolean checkAllSrcVerticesConfigured = true;
+ for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) {
+ if (!entry.getValue()) {
+ // vertex not configured
+ LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName);
+ checkAllSrcVerticesConfigured = false;
+ break;
+ }
+ }
+ allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
+
+ scheduleTasks();
+ }
+
+ @Override
+ public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ completedUpstreamTasks ++;
+ LOG.info("Source task attempt {} completion received at vertex {}", attempt, this.vertexName);
+ }
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ }
+
+ @Override
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) {
+ }
+
+ private void scheduleTasks() {
+ if (!onVertexStartedDone.get()) {
+ // vertex not started yet
+ return;
+ }
+ if (tasksScheduled.get()) {
+ // already scheduled
+ return;
+ }
+
+ if (!canScheduleTasks()) {
+ return;
+ }
+
+ tasksScheduled.compareAndSet(false, true);
+ List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
+ for (int i = 0; i < managedTasks; ++i) {
+ tasksToStart.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
+ }
+
+ if (!tasksToStart.isEmpty()) {
+ LOG.info("Starting {} tasks in {}.", tasksToStart.size(), this.vertexName);
+ getContext().scheduleTasks(tasksToStart);
+ }
+ // all tasks scheduled. Can call vertexManagerDone().
+ }
+
+ private boolean canScheduleTasks() {
+ if (edgeTriggerType.equals(ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED)) {
+ return allSrcVerticesConfigured;
+ } else {
+ // pending TEZ-3999
+ throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
+ }
+ }
+
+
+ /**
+ * Create a {@link VertexManagerPluginDescriptor} builder that can be used to
+ * configure the plugin.
+ *
+ * @param conf
+ * {@link Configuration} May be modified in place. May be null if the
+ * configuration parameters are to be set only via code. If
+ * configuration values may be changed at runtime via a config file
+ * then pass in a {@link Configuration} that is initialized from a
+ * config file. The parameters that are not overridden in code will
+ * be derived from the Configuration object.
+ * @return {@link ConcurrentInputVertexManagerConfigBuilder}
+ */
+ public static ConcurrentInputVertexManagerConfigBuilder createConfigBuilder(
+ @Nullable Configuration conf) {
+ return new ConcurrentInputVertexManagerConfigBuilder(conf);
+ }
+
+ /**
+ * Helper class to configure VertexManagerWithConcurrentInput
+ */
+ public static final class ConcurrentInputVertexManagerConfigBuilder {
+ private final Configuration conf;
+
+ private ConcurrentInputVertexManagerConfigBuilder(@Nullable Configuration conf) {
+ if (conf == null) {
+ this.conf = new Configuration(false);
+ } else {
+ this.conf = conf;
+ }
+ }
+
+ public VertexManagerPluginDescriptor build() {
+ VertexManagerPluginDescriptor desc =
+ VertexManagerPluginDescriptor.create(
+ VertexManagerWithConcurrentInput.class.getName());
+
+ try {
+ return desc.setUserPayload(TezUtils
+ .createUserPayloadFromConf(this.conf));
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
new file mode 100644
index 0000000..619a4cd
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.edgemanager.SilentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestVertexManagerWithConcurrentInput {
+
+ @Captor
+ ArgumentCaptor<List<VertexManagerPluginContext.ScheduleTaskRequest>> requestCaptor;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test(timeout = 5000)
+ public void testBasicVertexWithConcurrentInput() throws Exception {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ int srcVertex1Parallelism = 2;
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeManagerPluginDescriptor.create(SilentEdgeManager.class.getName()),
+ EdgeProperty.DataSourceType.EPHEMERAL,
+ EdgeProperty.SchedulingType.CONCURRENT,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ String mockSrcVertexId2 = "Vertex2";
+ int srcVertex2Parallelism = 3;
+ EdgeProperty eProp2 = EdgeProperty.create(
+ EdgeManagerPluginDescriptor.create(SilentEdgeManager.class.getName()),
+ EdgeProperty.DataSourceType.EPHEMERAL,
+ EdgeProperty.SchedulingType.CONCURRENT,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ String mockManagedVertexId = "Vertex";
+ int vertexParallelism = 2;
+
+ VertexManagerWithConcurrentInput.ConcurrentInputVertexManagerConfigBuilder configurer =
+ VertexManagerWithConcurrentInput.createConfigBuilder(null);
+ VertexManagerPluginDescriptor pluginDesc = configurer.build();
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(vertexParallelism);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(srcVertex1Parallelism);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(srcVertex2Parallelism);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+ mockInputVertices.put(mockSrcVertexId2, eProp2);
+
+ VertexManagerWithConcurrentInput manager = new VertexManagerWithConcurrentInput(mockContext);
+ when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+ manager.initialize();
+ when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+
+ // source vertex 1 configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+
+ // source vertex 2 configured
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+
+ // then own vertex started
+ manager.onVertexStarted(Collections.singletonList(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+ verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
+ Assert.assertEquals(0, manager.completedUpstreamTasks);
+ }
+}