You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2018/11/20 22:50:21 UTC

tez git commit: TEZ-3998. Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType (Yingda Chen via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master efc733183 -> e6722a96b


TEZ-3998. Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType (Yingda Chen via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e6722a96
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e6722a96
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e6722a96

Branch: refs/heads/master
Commit: e6722a96b4827d19850a5a73bc7024941a9deb54
Parents: efc7331
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Tue Nov 20 16:48:59 2018 -0600
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Tue Nov 20 16:48:59 2018 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/tez/dag/api/DAG.java   |  88 ++++---
 .../org/apache/tez/dag/api/EdgeProperty.java    |  64 +++--
 .../apache/tez/dag/api/TezConfiguration.java    | 128 +++++-----
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  92 +++----
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  70 ++++--
 .../library/edgemanager/SilentEdgeManager.java  |  89 +++++++
 .../VertexManagerWithConcurrentInput.java       | 245 +++++++++++++++++++
 .../TestVertexManagerWithConcurrentInput.java   | 114 +++++++++
 8 files changed, 694 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 735c749..f8a2ddc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -79,16 +79,16 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 /**
- * Top level entity that defines the DAG (Directed Acyclic Graph) representing 
- * the data flow graph. Consists of a set of Vertices and Edges connecting the 
- * vertices. Vertices represent transformations of data and edges represent 
+ * Top level entity that defines the DAG (Directed Acyclic Graph) representing
+ * the data flow graph. Consists of a set of Vertices and Edges connecting the
+ * vertices. Vertices represent transformations of data and edges represent
  * movement of data between vertices.
  */
 @Public
 public class DAG {
-  
+
   private static final Logger LOG = LoggerFactory.getLogger(DAG.class);
-  
+
   final BidiMap<String, Vertex> vertices =
       new DualLinkedHashBidiMap<String, Vertex>();
   final Set<Edge> edges = Sets.newHashSet();
@@ -132,7 +132,7 @@ public class DAG {
     TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles, "DAG " + getName());
     return this;
   }
-  
+
   public synchronized DAG addVertex(Vertex vertex) {
     if (vertices.containsKey(vertex.getName())) {
       throw new IllegalStateException(
@@ -145,18 +145,18 @@ public class DAG {
   public synchronized Vertex getVertex(String vertexName) {
     return vertices.get(vertexName);
   }
-  
+
   /**
    * One of the methods that can be used to provide information about required
    * Credentials when running on a secure cluster. A combination of this and
    * addURIsForCredentials should be used to specify information about all
    * credentials required by a DAG. AM specific credentials are not used when
    * executing a DAG.
-   * 
+   *
    * Set credentials which will be required to run this dag. This method can be
    * used if the client has already obtained some or all of the required
    * credentials.
-   * 
+   *
    * @param credentials Credentials for the DAG
    * @return {@link DAG}
    */
@@ -196,7 +196,7 @@ public class DAG {
   }
 
   /**
-   * Create a group of vertices that share a common output. This can be used to implement 
+   * Create a group of vertices that share a common output. This can be used to implement
    * unions efficiently.
    * @param name Name of the group.
    * @param members {@link Vertex} members of the group
@@ -243,15 +243,15 @@ public class DAG {
    * setCredentials should be used to specify information about all credentials
    * required by a DAG. AM specific credentials are not used when executing a
    * DAG.
-   * 
+   *
    * This method can be used to specify a list of URIs for which Credentials
    * need to be obtained so that the job can run. An incremental list of URIs
    * can be provided by making multiple calls to the method.
-   * 
+   *
    * Currently, @{link credentials} can only be fetched for HDFS and other
    * {@link org.apache.hadoop.fs.FileSystem} implementations that support
    * credentials.
-   * 
+   *
    * @param uris
    *          a list of {@link URI}s
    * @return {@link DAG}
@@ -263,7 +263,7 @@ public class DAG {
   }
 
   /**
-   * 
+   *
    * @return an unmodifiable list representing the URIs for which credentials
    *         are required.
    */
@@ -271,7 +271,7 @@ public class DAG {
   public synchronized Collection<URI> getURIsForCredentials() {
     return Collections.unmodifiableCollection(urisForCredentials);
   }
-  
+
   @Private
   public synchronized Set<Vertex> getVertices() {
     return Collections.unmodifiableSet(this.vertices.values());
@@ -304,7 +304,7 @@ public class DAG {
     edges.add(edge);
     return this;
   }
-  
+
   /**
    * Add a {@link GroupInputEdge} to the DAG.
    * @param edge {@link GroupInputEdge}
@@ -328,7 +328,7 @@ public class DAG {
     VertexGroup av = edge.getInputVertexGroup();
     av.addOutputVertex(edge.getOutputVertex(), edge);
     groupInputEdges.add(edge);
-    
+
     // add new edge between members of VertexGroup and destVertex of the GroupInputEdge
     List<Edge> newEdges = Lists.newLinkedList();
     Vertex dstVertex = edge.getOutputVertex();
@@ -337,14 +337,14 @@ public class DAG {
       newEdges.add(Edge.create(member, dstVertex, edge.getEdgeProperty()));
     }
     dstVertex.addGroupInput(uv.getGroupName(), uv.getGroupInfo());
-    
+
     for (Edge e : newEdges) {
       addEdge(e);
     }
-    
+
     return this;
   }
-  
+
   /**
    * Get the DAG name
    * @return DAG name
@@ -433,7 +433,7 @@ public class DAG {
         newKnownTasksVertices.add(vertex);
       }
     }
-    
+
     // walk through all known source 1-1 edges and infer parallelism
     // add newly inferred vertices for consideration as known sources
     // the outer loop will run for every new level of inferring the parallelism
@@ -456,19 +456,19 @@ public class DAG {
         }
       }
     }
-    
+
     // check for inconsistency and errors
     for (Edge e : edges) {
       Vertex inputVertex = e.getInputVertex();
       Vertex outputVertex = e.getOutputVertex();
-      
+
       if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
         if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
           // both should be equal or equal to -1.
           if (outputVertex.getParallelism() != -1) {
             throw new TezUncheckedException(
                 "1-1 Edge. Destination vertex parallelism must match source vertex. "
-                + "Vertex: " + inputVertex.getName() + " does not match vertex: " 
+                + "Vertex: " + inputVertex.getName() + " does not match vertex: "
                 + outputVertex.getName());
           }
         }
@@ -527,7 +527,7 @@ public class DAG {
       }
     }
   }
-  
+
   // AnnotatedVertex is used by verify()
   private static class AnnotatedVertex {
     Vertex v;
@@ -573,7 +573,7 @@ public class DAG {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
-    
+
     // check for valid vertices, duplicate vertex names,
     // and prepare for cycle detection
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
@@ -591,14 +591,14 @@ public class DAG {
     for (Edge e : edges) {
       // Construct structure for cycle detection
       Vertex inputVertex = e.getInputVertex();
-      Vertex outputVertex = e.getOutputVertex();      
+      Vertex outputVertex = e.getOutputVertex();
       List<Edge> edgeList = edgeMap.get(inputVertex);
       if (edgeList == null) {
         edgeList = new ArrayList<Edge>();
         edgeMap.put(inputVertex, edgeList);
       }
       edgeList.add(e);
-      
+
       // Construct map for Input name verification
       Set<String> inboundSet = inboundVertexMap.get(outputVertex);
       if (inboundSet == null) {
@@ -606,7 +606,7 @@ public class DAG {
         inboundVertexMap.put(outputVertex, inboundSet);
       }
       inboundSet.add(inputVertex.getName());
-      
+
       // Construct map for Output name verification
       Set<String> outboundSet = outboundVertexMap.get(inputVertex);
       if (outboundSet == null) {
@@ -618,7 +618,7 @@ public class DAG {
 
     // check input and output names don't collide with vertex names
     for (Vertex vertex : vertices.values()) {
-      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
            input : vertex.getInputs()) {
         if (vertexMap.containsKey(input.getName())) {
           throw new IllegalStateException("Vertex: "
@@ -627,7 +627,7 @@ public class DAG {
               + input.getName());
         }
       }
-      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
+      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
             output : vertex.getOutputs()) {
         if (vertexMap.containsKey(output.getName())) {
           throw new IllegalStateException("Vertex: "
@@ -641,7 +641,7 @@ public class DAG {
     // Check for valid InputNames
     for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
       Vertex vertex = entry.getKey();
-      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
+      for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
            input : vertex.getInputs()) {
         if (entry.getValue().contains(input.getName())) {
           throw new IllegalStateException("Vertex: "
@@ -655,7 +655,7 @@ public class DAG {
     // Check for valid OutputNames
     for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
       Vertex vertex = entry.getKey();
-      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
+      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
             output : vertex.getOutputs()) {
         if (entry.getValue().contains(output.getName())) {
           throw new IllegalStateException("Vertex: "
@@ -665,8 +665,8 @@ public class DAG {
         }
       }
     }
-    
-    
+
+
     // Not checking for repeated input names / output names vertex names on the same vertex,
     // since we only allow 1 at the moment.
     // When additional inputs are supported, this can be chceked easily (and early)
@@ -678,16 +678,12 @@ public class DAG {
 
     if (restricted) {
       for (Edge e : edges) {
-        if (e.getEdgeProperty().getDataSourceType() !=
-          DataSourceType.PERSISTED) {
+        DataSourceType dataSourceType = e.getEdgeProperty().getDataSourceType();
+        if (dataSourceType != DataSourceType.PERSISTED &&
+            dataSourceType != DataSourceType.EPHEMERAL) {
           throw new IllegalStateException(
             "Unsupported source type on edge. " + e);
         }
-        if (e.getEdgeProperty().getSchedulingType() !=
-          SchedulingType.SEQUENTIAL) {
-          throw new IllegalStateException(
-            "Unsupported scheduling type on edge. " + e);
-        }
       }
     }
 
@@ -878,13 +874,13 @@ public class DAG {
           groupBuilder.addGroupMembers(v.getName());
         }
         groupBuilder.addAllOutputs(groupInfo.outputs);
-        for (Map.Entry<String, InputDescriptor> entry : 
+        for (Map.Entry<String, InputDescriptor> entry :
              groupInfo.edgeMergedInputs.entrySet()) {
           groupBuilder.addEdgeMergedInputs(
               PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
               setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
         }
-        dagBuilder.addVertexGroups(groupBuilder); 
+        dagBuilder.addVertexGroups(groupBuilder);
       }
     }
 
@@ -956,7 +952,7 @@ public class DAG {
           dagCredentials.addAll(dataSink.getCredentials());
         }
       }
-      
+
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
@@ -1045,7 +1041,7 @@ public class DAG {
           }
         }
       }
-      
+
       if (vertex.getVertexManagerPlugin() != null) {
         vertexBuilder.setVertexManagerPlugin(DagTypeConverters
             .convertToDAGPlan(vertex.getVertexManagerPlugin()));

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 07fb2c1..c203f8c 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -43,7 +43,7 @@ public class EdgeProperty {
    */
   public enum DataMovementType {
     /**
-     * Output on this edge produced by the i-th source task is available to the 
+     * Output on this edge produced by the i-th source task is available to the
      * i-th destination task.
      */
     ONE_TO_ONE,
@@ -58,20 +58,20 @@ public class EdgeProperty {
      * are gathered by designated destination tasks.
      */
     SCATTER_GATHER,
-    
+
     /**
      * Custom routing defined by the user.
      */
     CUSTOM
   }
-  
+
   /**
    * Determines the lifetime of the data produced on this edge by a source task.
    */
   public enum DataSourceType {
     /**
      * Data produced by the source is persisted and available even when the
-     * task is not running. The data may become unavailable and may cause the 
+     * task is not running. The data may become unavailable and may cause the
      * source task to be re-executed.
      */
     PERSISTED,
@@ -82,31 +82,51 @@ public class EdgeProperty {
     PERSISTED_RELIABLE,
     /**
      * Data produced by the source task is available only while the source task
-     * is running. This requires the destination task to run concurrently with 
-     * the source task. This is not supported yet.
+     * is running. This requires the destination task to run concurrently with
+     * the source task. Development in progress.
      */
     @Unstable
     EPHEMERAL
   }
-  
+
   /**
-   * Determines when the destination task is eligible to run, once the source  
+   * Determines when the destination task is eligible to run, once the source
    * task is eligible to run.
    */
   public enum SchedulingType {
     /**
-     * Destination task is eligible to run after one or more of its source tasks 
+     * Destination task is eligible to run after one or more of its source tasks
      * have started or completed.
      */
     SEQUENTIAL,
     /**
      * Destination task must run concurrently with the source task.
-     *  This is not supported yet.
+     * Development in progress.
      */
     @Unstable
     CONCURRENT
   }
-  
+
+  /**
+   * Determines the relevant event(s) that will assist in scheduling downstream vertex
+   * connected via a edge with CONCURRENT {@link SchedulingType}.
+   */
+  public enum ConcurrentEdgeTriggerType {
+    /**
+     * trigger tasks scheduling for downstream vertex(es) upon upstream being configured
+     * this effectively simultaneously schedules downstream and upstream vertices
+     * connected on both ends of a concurrent edge.
+     */
+    SOURCE_VERTEX_CONFIGURED,
+
+    /**
+     * trigger tasks scheduling for downstream vertex(es) by "running" event(s) of upstream tasks
+     * this will be fully supported with TEZ-3999
+     */
+    SOURCE_TASK_STARTED
+  }
+
+
   final DataMovementType dataMovementType;
   final DataSourceType dataSourceType;
   final SchedulingType schedulingType;
@@ -172,7 +192,7 @@ public class EdgeProperty {
     Preconditions.checkArgument(dataMovementType != DataMovementType.CUSTOM,
         DataMovementType.CUSTOM + " cannot be used with this constructor");
   }
-  
+
 
   private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
                        DataSourceType dataSourceType,
@@ -182,7 +202,7 @@ public class EdgeProperty {
     this(edgeManagerDescriptor, DataMovementType.CUSTOM, dataSourceType, schedulingType,
         edgeSource, edgeDestination);
   }
-  
+
   private EdgeProperty(EdgeManagerPluginDescriptor edgeManagerDescriptor,
       DataMovementType dataMovementType, DataSourceType dataSourceType,
       SchedulingType schedulingType, OutputDescriptor edgeSource, InputDescriptor edgeDestination) {
@@ -193,7 +213,7 @@ public class EdgeProperty {
     this.inputDescriptor = edgeDestination;
     this.outputDescriptor = edgeSource;
   }
-  
+
   /**
    * Get the {@link DataMovementType}
    * @return {@link DataMovementType}
@@ -201,7 +221,7 @@ public class EdgeProperty {
   public DataMovementType getDataMovementType() {
     return dataMovementType;
   }
-  
+
   /**
    * Get the {@link DataSourceType}
    * @return {@link DataSourceType}
@@ -209,7 +229,7 @@ public class EdgeProperty {
   public DataSourceType getDataSourceType() {
     return dataSourceType;
   }
-  
+
   /**
    * Get the {@link SchedulingType}
    * @return {@link SchedulingType}
@@ -217,30 +237,30 @@ public class EdgeProperty {
   public SchedulingType getSchedulingType() {
     return schedulingType;
   }
-  
+
   /**
    * @return the {@link InputDescriptor} which will consume data from the edge.
    */
   public InputDescriptor getEdgeDestination() {
     return inputDescriptor;
   }
-  
+
   /**
    * @return the {@link OutputDescriptor} which produces data on the edge.
    */
   public OutputDescriptor getEdgeSource() {
     return outputDescriptor;
   }
-  
+
   /**
-   * Returns the Edge Manager specifications for this edge.  
+   * Returns the Edge Manager specifications for this edge.
    * @return @link {@link EdgeManagerPluginDescriptor} if a custom edge was setup, null otherwise.
    */
   @Private
   public EdgeManagerPluginDescriptor getEdgeManagerDescriptor() {
     return edgeManagerDescriptor;
   }
-  
+
   @Override
   public String toString() {
     return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
@@ -248,5 +268,5 @@ public class EdgeProperty {
         + " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName())
         + " }";
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 43014a4..791e634 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.tez.common.annotation.ConfigurationClass;
 import org.apache.tez.common.annotation.ConfigurationProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -41,7 +42,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 
 /**
- * Defines the configurations for Tez. These configurations are typically specified in 
+ * Defines the configurations for Tez. These configurations are typically specified in
  * tez-site.xml on the client machine where TezClient is used to launch the Tez application.
  * tez-site.xml is expected to be picked up from the classpath of the client process.
  * @see <a href="../../../../../configs/TezConfiguration.html">Detailed Configuration Information</a>
@@ -131,7 +132,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
 
   /**
-   * Boolean value. If true then Tez will try to automatically delete temporary job 
+   * Boolean value. If true then Tez will try to automatically delete temporary job
    * artifacts that it creates within the specified staging dir. Does not affect any user data.
    */
   @ConfigurationScope(Scope.AM)
@@ -183,7 +184,7 @@ public class TezConfiguration extends Configuration {
       + "use.concurrent-dispatcher";
   @Private
   public static boolean TEZ_AM_USE_CONCURRENT_DISPATCHER_DEFAULT = false;
-  
+
   @Private
   @ConfigurationScope(Scope.AM)
   public static final String TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY = TEZ_AM_PREFIX
@@ -196,7 +197,7 @@ public class TezConfiguration extends Configuration {
    * code is written according to best practices then the same code can execute in either mode based
    * on this configuration. Session mode is more aggressive in reserving execution resources and is
    * typically used for interactive applications where multiple DAGs are submitted in quick succession
-   * by the same user. For long running applications, one-off executions, batch jobs etc non-session 
+   * by the same user. For long running applications, one-off executions, batch jobs etc non-session
    * mode is recommended. If session mode is enabled then container reuse is recommended.
    */
   @ConfigurationScope(Scope.AM)
@@ -271,12 +272,12 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
 
   /**
-   * int value. Represents the maximum time in seconds for which a consumer attempt can report 
-   * a read error against its producer attempt, after which the producer attempt will be re-run 
-   * to re-generate the output. There are other heuristics which determine the retry and mainly 
-   * try to guard against a flurry of re-runs due to intermittent read errors 
+   * int value. Represents the maximum time in seconds for which a consumer attempt can report
+   * a read error against its producer attempt, after which the producer attempt will be re-run
+   * to re-generate the output. There are other heuristics which determine the retry and mainly
+   * try to guard against a flurry of re-runs due to intermittent read errors
    * (due to network issues). This configuration puts a time limit on those heuristics to ensure
-   * jobs dont hang indefinitely due to lack of closure in those heuristics 
+   * jobs dont hang indefinitely due to lack of closure in those heuristics
    *
    * Expert level setting.
    */
@@ -288,9 +289,9 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
-   * output specific operation and typically involves making the output visible for consumption. 
-   * If the config is true, then the outputs are committed at the end of DAG completion after all 
-   * constituent vertices have completed. If false, outputs for each vertex are committed after that 
+   * output specific operation and typically involves making the output visible for consumption.
+   * If the config is true, then the outputs are committed at the end of DAG completion after all
+   * constituent vertices have completed. If false, outputs for each vertex are committed after that
    * vertex succeeds. Depending on the desired output visibility and downstream consumer dependencies
    * this value must be appropriately chosen. Defaults to the safe choice of true.
    */
@@ -330,7 +331,7 @@ public class TezConfiguration extends Configuration {
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty
   public static final String TEZ_AM_LAUNCH_CMD_OPTS = TEZ_AM_PREFIX +  "launch.cmd-opts";
-  public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT = 
+  public static final String TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT =
       "-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC";
 
   /**
@@ -409,6 +410,19 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_LAUNCH_ENV_DEFAULT = "";
 
   /**
+   * String value. In the presence of concurrent input edge to a vertex, this describes
+   * the timing of scheduling downstream vertex tasks. It may be closely related to the
+   * type of event that will contribute to a scheduling decision.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty
+  public static final String TEZ_CONCURRENT_EDGE_TRIGGER_TYPE =
+      TEZ_TASK_PREFIX + "concurrent.edge.trigger.type";
+  public static final String TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT =
+      ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.name();
+
+
+  /**
    * String value. Env settings will be merged with {@link #TEZ_TASK_LAUNCH_ENV}
    * during the launch of the task process. This property will typically be configured to
    * include default system env meant to be used by all jobs in a cluster. If required, the values can
@@ -508,16 +522,16 @@ public class TezConfiguration extends Configuration {
 
   @Unstable
   /**
-   * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency 
+   * Boolean value. Enable speculative execution of slower tasks. This can help reduce job latency
    * when some tasks are running slower due bad/slow machines
    */
   @ConfigurationScope(Scope.VERTEX)  // TODO Verify the vertex speculation, TEZ-1788
   @ConfigurationProperty(type="boolean")
   public static final String TEZ_AM_SPECULATION_ENABLED = TEZ_AM_PREFIX + "speculation.enabled";
   public static final boolean TEZ_AM_SPECULATION_ENABLED_DEFAULT = false;
-  
+
   /**
-   * Float value. Specifies how many standard deviations away from the mean task execution time 
+   * Float value. Specifies how many standard deviations away from the mean task execution time
    * should be considered as an outlier/slow task.
    */
   @Unstable
@@ -539,14 +553,14 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Int value. Upper limit on the number of threads user to launch containers in the app
-   * master. Expert level setting. 
+   * master. Expert level setting.
    */
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="integer")
   public static final String TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
     TEZ_AM_PREFIX + "containerlauncher.thread-count-limit";
 
-  public static final int TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT = 
+  public static final int TEZ_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT_DEFAULT =
     500;
 
 
@@ -560,8 +574,8 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 10;
 
   /**
-   * Int value. Specifies the number of times the app master can be launched in order to recover 
-   * from app master failure. Typically app master failures are non-recoverable. This parameter 
+   * Int value. Specifies the number of times the app master can be launched in order to recover
+   * from app master failure. Typically app master failures are non-recoverable. This parameter
    * is for cases where the app master is not at fault but is lost due to system errors.
    * Expert level setting.
    */
@@ -582,7 +596,7 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1;
 
   /**
-   * Int value. The maximum number of attempts that can fail for a particular task before the task is failed. 
+   * Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
    * This does not count killed attempts. Task failure results in DAG failure.
    */
   @ConfigurationScope(Scope.VERTEX)
@@ -612,7 +626,7 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_AM_TASK_RESCHEDULE_RELAXED_LOCALITY_DEFAULT=true;
 
   /**
-   * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes 
+   * Boolean value. Enabled blacklisting of nodes of nodes that are considered faulty. These nodes
    * will not be used to execute tasks.
    */
   @ConfigurationScope(Scope.AM)
@@ -620,11 +634,11 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
       + "node-blacklisting.enabled";
   public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
-  
+
   /**
    * Int value. Specifies the percentage of nodes in the cluster that may be considered faulty.
-   * This limits the number of nodes that are blacklisted in an effort to minimize the effects of 
-   * temporary surges in failures (e.g. due to network outages). 
+   * This limits the number of nodes that are blacklisted in an effort to minimize the effects of
+   * temporary surges in failures (e.g. due to network outages).
    */
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="integer")
@@ -651,7 +665,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AM_CLIENT_THREAD_COUNT =
       TEZ_AM_PREFIX + "client.am.thread-count";
   public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 2;
-  
+
   /**
    * String value. Range of ports that the AM can use when binding for client connections. Leave blank
    * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
@@ -721,7 +735,7 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_DAG_CLEANUP_THREAD_COUNT_LIMIT_DEFAULT = 10;
 
   /** Int value. The amount of memory in MB to be used by tasks. This applies to all tasks across
-   * all vertices. Setting it to the same value for all tasks is helpful for container reuse and 
+   * all vertices. Setting it to the same value for all tasks is helpful for container reuse and
    * thus good for performance typically. */
   @ConfigurationScope(Scope.DAG)  // TODO vertex level
   @ConfigurationProperty(type="integer")
@@ -736,7 +750,7 @@ public class TezConfiguration extends Configuration {
   @ConfigurationProperty(type="integer")
   public static final String TEZ_TASK_RESOURCE_CPU_VCORES = TEZ_TASK_PREFIX
       + "resource.cpu.vcores";
-  public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1; 
+  public static final int TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT = 1;
 
   /**
    * Int value. The maximum heartbeat interval between the AM and RM in milliseconds
@@ -751,7 +765,7 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Int value. The maximum amount of time, in milliseconds, to wait before a task asks an
-   * AM for another task. Increasing this can help improve app master scalability for a large 
+   * AM for another task. Increasing this can help improve app master scalability for a large
    * number of concurrent tasks. Expert level setting.
    */
   @ConfigurationScope(Scope.AM)
@@ -761,7 +775,7 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 200;
 
   /**
-   * Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks. 
+   * Int value. The maximum heartbeat interval, in milliseconds, between the app master and tasks.
    * Increasing this can help improve app master scalability for a large number of concurrent tasks.
    * Expert level setting.
    */
@@ -772,8 +786,8 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
 
   /**
-   * Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from 
-   * tasks. This reduces the amount of network traffice between AM and tasks to send high-volume 
+   * Int value. Interval, in milliseconds, after which counters are sent to AM in heartbeat from
+   * tasks. This reduces the amount of network traffice between AM and tasks to send high-volume
    * counters. Improves AM scalability. Expert level setting.
    */
   @ConfigurationScope(Scope.AM)
@@ -792,7 +806,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
       + "max-events-per-heartbeat";
   public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 500;
-  
+
   /**
    * Int value. Maximum number of pending task events before a task will stop
    * asking for more events in the task heartbeat.
@@ -827,16 +841,16 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_TASK_INITIALIZE_PROCESSOR_IO_SERIALLY_DEFAULT = false;
 
   /**
-   * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output 
-   * components need to make successive progress notifications. If the progress is not notified 
+   * Long value. Interval, in milliseconds, within which any of the tasks Input/Processor/Output
+   * components need to make successive progress notifications. If the progress is not notified
    * for this interval then the task will be considered hung and terminated.
-   * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS} 
+   * The value for this config should be larger than {@link TezConfiguration#TASK_HEARTBEAT_TIMEOUT_MS}
    * and larger than 2 times the value of {@link TezConfiguration#TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS}.
    * A config value <=0 disables this.
    */
   @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty
-  public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX + 
+  public static final String TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS = TEZ_TASK_PREFIX +
     "progress.stuck.interval-ms";
   public static final long TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS_DEFAULT = -1;
 
@@ -1010,7 +1024,7 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Boolean value. Whether to reuse containers for non-local tasks. Active only if reuse is
-   * enabled. Turning this on can severely affect locality and can be bad for jobs with high data 
+   * enabled. Turning this on can severely affect locality and can be bad for jobs with high data
    * volume being read from the primary data sources.
    */
   @ConfigurationScope(Scope.AM)
@@ -1047,15 +1061,15 @@ public class TezConfiguration extends Configuration {
     TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT = 250l;
 
   /**
-   * Int value. The minimum amount of time to hold on to a container that is idle. Only active when 
-   * reuse is enabled. Set to -1 to never release idle containers (not recommended). 
+   * Int value. The minimum amount of time to hold on to a container that is idle. Only active when
+   * reuse is enabled. Set to -1 to never release idle containers (not recommended).
    */
   @ConfigurationScope(Scope.AM)
   @ConfigurationProperty(type="integer")
   public static final String TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS =
     TEZ_AM_PREFIX + "container.idle.release-timeout-min.millis";
   public static final long
-    TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT = 5000l;  
+    TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS_DEFAULT = 5000l;
 
   /**
    * Int value. The maximum amount of time to hold on to a container if no task can be
@@ -1064,7 +1078,7 @@ public class TezConfiguration extends Configuration {
    * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS.
    * Containers will have an expire time set to a random value between
    * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS &&
-   * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This 
+   * TezConfiguration#TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS. This
    * creates a graceful reduction in the amount of idle resources held
    */
   @ConfigurationScope(Scope.AM)
@@ -1073,9 +1087,9 @@ public class TezConfiguration extends Configuration {
       TEZ_AM_PREFIX + "container.idle.release-timeout-max.millis";
   public static final long
     TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS_DEFAULT = 10000l;
-  
+
   /**
-   * Int value. The minimum number of containers that will be held in session mode. Not active in 
+   * Int value. The minimum number of containers that will be held in session mode. Not active in
    * non-session mode. Enables an idle session (not running any DAG) to hold on to a minimum number
    * of containers to provide fast response times for the next DAG.
    */
@@ -1086,7 +1100,7 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT = 0;
 
   /**
-   * Boolean value. Allow/disable logging for all dags in a session   
+   * Boolean value. Allow/disable logging for all dags in a session
    */
   @Private
   @ConfigurationScope(Scope.AM)
@@ -1119,7 +1133,7 @@ public class TezConfiguration extends Configuration {
   public static final float TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT = 0.0f;
   /**
    * Int value. The number of RM heartbeats to wait after preempting running tasks before preempting
-   * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the 
+   * more running tasks. After preempting a task, we need to wait at least 1 heartbeat so that the
    * RM can act on the released resources and assign new ones to us. Expert level setting.
    */
   @ConfigurationScope(Scope.AM)
@@ -1130,8 +1144,8 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Int value. Time (in millisecs) that an unsatisfied request will wait before preempting other
-   * resources. In rare cases, the cluster says there are enough free resources but does not end 
-   * up getting enough on a node to actually assign it to the job. This configuration tries to put 
+   * resources. In rare cases, the cluster says there are enough free resources but does not end
+   * up getting enough on a node to actually assign it to the job. This configuration tries to put
    * a deadline on such wait to prevent indefinite job hangs.
    */
   @ConfigurationScope(Scope.AM)
@@ -1169,7 +1183,7 @@ public class TezConfiguration extends Configuration {
    *
    * Specify additional user classpath information to be used for Tez AM and all containers.
    * This will be appended to the classpath after PWD
-   * 
+   *
    * 'tez.lib.uris.classpath' defines the relative classpath into the archives
    * that are set in 'tez.lib.uris'
    *
@@ -1195,7 +1209,7 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_AUX_URIS = TEZ_PREFIX + "aux.uris";
 
   /**
-   * Boolean value. Allows to ignore 'tez.lib.uris'. Useful during development as well as 
+   * Boolean value. Allows to ignore 'tez.lib.uris'. Useful during development as well as
    * raw Tez application where classpath is propagated with application
    * via {@link LocalResource}s. This is mainly useful for developer/debugger scenarios.
    */
@@ -1261,8 +1275,8 @@ public class TezConfiguration extends Configuration {
 
   /**
    * Int value. Time (in seconds) to wait for AM to come up when trying to submit a DAG
-   * from the client. Only relevant in session mode. If the cluster is busy and cannot launch the 
-   * AM then this timeout may be hit. In those case, using non-session mode is recommended if 
+   * from the client. Only relevant in session mode. If the cluster is busy and cannot launch the
+   * AM then this timeout may be hit. In those case, using non-session mode is recommended if
    * applicable. Otherwise increase the timeout (set to -1 for infinity. Not recommended)
    */
   @ConfigurationScope(Scope.AM)
@@ -1433,7 +1447,7 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_HISTORY_LOGGING_TIMELINE_NUM_DAGS_PER_GROUP_DEFAULT = 1;
 
   /**
-   * String value. The directory into which history data will be written. This defaults to the 
+   * String value. The directory into which history data will be written. This defaults to the
    * container logging directory. This is relevant only when SimpleHistoryLoggingService is being
    * used for {@link TezConfiguration#TEZ_HISTORY_LOGGING_SERVICE_CLASS}
    */
@@ -1542,7 +1556,7 @@ public class TezConfiguration extends Configuration {
       + "yarn.ats.acl.dag.domain.id";
 
   /**
-   * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the 
+   * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the
    * incomplete DAGs from the previous instance of the app master.
    */
   @ConfigurationScope(Scope.AM)
@@ -1641,10 +1655,10 @@ public class TezConfiguration extends Configuration {
   public static final boolean TEZ_AM_ACLS_ENABLED_DEFAULT = true;
 
   /**
-   * String value. 
+   * String value.
    * AM view ACLs. This allows the specified users/groups to view the status of the AM and all DAGs
    * that run within this AM.
-   * Comma separated list of users, followed by whitespace, followed by a comma separated list of 
+   * Comma separated list of users, followed by whitespace, followed by a comma separated list of
    * groups
    */
   @ConfigurationScope(Scope.AM)
@@ -1655,7 +1669,7 @@ public class TezConfiguration extends Configuration {
    * String value.
    * AM modify ACLs. This allows the specified users/groups to run modify operations on the AM
    * such as submitting DAGs, pre-warming the session, killing DAGs or shutting down the session.
-   * Comma separated list of users, followed by whitespace, followed by a comma separated list of 
+   * Comma separated list of users, followed by whitespace, followed by a comma separated list of
    * groups
    */
   @ConfigurationScope(Scope.AM)

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index e3c40aa..bde4622 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -120,7 +120,7 @@ public class TestDAGVerify {
     dag.addEdge(e1);
     dag.verify();
   }
-  
+
   @Test(timeout = 5000)
   // v1 (known) -> v2 (-1) -> v3 (-1)
   public void testVerifyOneToOneInferParallelism() {
@@ -153,7 +153,7 @@ public class TestDAGVerify {
     Assert.assertEquals(dummyTaskCount, v2.getParallelism());
     Assert.assertEquals(dummyTaskCount, v3.getParallelism());
   }
-  
+
   @Test(timeout = 5000)
   // v1 (known) -> v2 (-1) -> v3 (-1)
   // The test checks resiliency to ordering of the vertices/edges
@@ -187,7 +187,7 @@ public class TestDAGVerify {
     Assert.assertEquals(dummyTaskCount, v2.getParallelism());
     Assert.assertEquals(dummyTaskCount, v3.getParallelism());
   }
-  
+
   @Test(timeout = 5000)
   public void testVerifyOneToOneNoInferParallelism() {
     Vertex v1 = Vertex.create("v1",
@@ -211,7 +211,7 @@ public class TestDAGVerify {
     dag.verify();
     Assert.assertEquals(-1, v2.getParallelism());
   }
-  
+
   @Test(timeout = 5000)
   // v1 (-1) -> v2 (known) -> v3 (-1)
   public void testVerifyOneToOneIncorrectParallelism1() {
@@ -296,7 +296,7 @@ public class TestDAGVerify {
           "1-1 Edge. Destination vertex parallelism must match source vertex"));
     }
   }
-  
+
   @Test(timeout = 5000)
   public void testVerifyBroadcast() {
     Vertex v1 = Vertex.create("v1",
@@ -317,7 +317,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test(expected = IllegalStateException.class, timeout = 5000)  
+  @Test(timeout = 5000)
   public void testVerify3() {
     Vertex v1 = Vertex.create("v1",
         ProcessorDescriptor.create(dummyProcessorClassName),
@@ -337,7 +337,7 @@ public class TestDAGVerify {
     dag.verify();
   }
 
-  @Test(expected = IllegalStateException.class, timeout = 5000)  
+  @Test(timeout = 5000)
   public void testVerify4() {
     Vertex v1 = Vertex.create("v1",
         ProcessorDescriptor.create(dummyProcessorClassName),
@@ -525,7 +525,7 @@ public class TestDAGVerify {
     System.out.println(ex.getMessage());
     Assert.assertTrue(ex.getMessage().startsWith("Vertex v1 already defined"));
   }
-  
+
   @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testInputAndInputVertexNameCollision() {
     Vertex v1 = Vertex.create("v1",
@@ -534,22 +534,22 @@ public class TestDAGVerify {
     Vertex v2 = Vertex.create("v2",
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     v2.addDataSource("v1", DataSourceDescriptor.create(null, null, null));
-    
+
     Edge e1 = Edge.create(v1, v2,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
             OutputDescriptor.create("dummy output class"),
             InputDescriptor.create("dummy input class")));
-    
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addEdge(e1);
     dag.verify();
   }
-  
+
   @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testOutputAndOutputVertexNameCollision() {
     Vertex v1 = Vertex.create("v1",
@@ -558,22 +558,22 @@ public class TestDAGVerify {
     Vertex v2 = Vertex.create("v2",
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
-    
+
     Edge e1 = Edge.create(v1, v2,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
             OutputDescriptor.create("dummy output class"),
             InputDescriptor.create("dummy input class")));
-    
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addEdge(e1);
     dag.verify();
   }
-  
+
   @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testOutputAndVertexNameCollision() {
     Vertex v1 = Vertex.create("v1",
@@ -582,15 +582,15 @@ public class TestDAGVerify {
     Vertex v2 = Vertex.create("v2",
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     v1.addDataSink("v2", DataSinkDescriptor.create(null, null, null));
-    
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.verify();
   }
-  
+
   @Test(expected = IllegalStateException.class, timeout = 5000)
   public void testInputAndVertexNameCollision() {
     Vertex v1 = Vertex.create("v1",
@@ -599,9 +599,9 @@ public class TestDAGVerify {
     Vertex v2 = Vertex.create("v2",
         ProcessorDescriptor.create("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     v1.addDataSource("v2", DataSourceDescriptor.create(null, null, null));
-    
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addVertex(v2);
@@ -640,7 +640,7 @@ public class TestDAGVerify {
     dag.addEdge(e2);
     dag.verify();
   }
-  
+
   @Test(timeout = 5000)
   public void testVertexGroupWithMultipleOutputEdges() {
     Vertex v1 = Vertex.create("v1",
@@ -655,19 +655,19 @@ public class TestDAGVerify {
     Vertex v4 = Vertex.create("v4",
         ProcessorDescriptor.create("Processor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     DAG dag = DAG.create("testDag");
     VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
     uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-    
+
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
             OutputDescriptor.create("dummy output class"),
             InputDescriptor.create("dummy input class")),
         InputDescriptor.create("dummy input class"));
-    
+
     GroupInputEdge e2 = GroupInputEdge.create(uv12, v4,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -685,7 +685,7 @@ public class TestDAGVerify {
     for (int i = 0; i< 10;++i){
       dag.verify();  // should be OK when called multiple times
     }
-    
+
     Assert.assertEquals(2, v1.getOutputVertices().size());
     Assert.assertEquals(2, v2.getOutputVertices().size());
     Assert.assertTrue(v1.getOutputVertices().contains(v3));
@@ -693,7 +693,7 @@ public class TestDAGVerify {
     Assert.assertTrue(v2.getOutputVertices().contains(v3));
     Assert.assertTrue(v2.getOutputVertices().contains(v4));
   }
-  
+
   @Test(timeout = 5000)
   public void testVertexGroup() {
     Vertex v1 = Vertex.create("v1",
@@ -711,16 +711,16 @@ public class TestDAGVerify {
     Vertex v5 = Vertex.create("v5",
         ProcessorDescriptor.create("Processor"),
         dummyTaskCount, dummyTaskResource);
-    
+
     DAG dag = DAG.create("testDag");
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
     uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-    
+
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
-    
+
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v4,
         EdgeProperty.create(DataMovementType.SCATTER_GATHER,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -733,7 +733,7 @@ public class TestDAGVerify {
             OutputDescriptor.create("dummy output class"),
             InputDescriptor.create("dummy input class")),
         InputDescriptor.create("dummy input class"));
-    
+
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addVertex(v3);
@@ -744,7 +744,7 @@ public class TestDAGVerify {
     for (int i = 0; i< 10;++i){
       dag.verify(); // should be OK when called multiple times
     }
-    
+
     // for the first Group v1 and v2 should get connected to v4 and also have 1 output
     // for the second Group v2 and v3 should get connected to v5
     // the Group place holders should disappear
@@ -775,7 +775,7 @@ public class TestDAGVerify {
     Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2));
     Assert.assertEquals(2, dag.vertexGroups.size());
   }
-  
+
   @Test(timeout = 5000)
   public void testVertexGroupOneToOne() {
     Vertex v1 = Vertex.create("v1",
@@ -793,16 +793,16 @@ public class TestDAGVerify {
     Vertex v5 = Vertex.create("v5",
         ProcessorDescriptor.create("Processor"),
         -1, dummyTaskResource);
-    
+
     DAG dag = DAG.create("testDag");
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
     uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, null, null));
-    
+
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
-    
+
     GroupInputEdge e1 = GroupInputEdge.create(uv12, v4,
         EdgeProperty.create(DataMovementType.ONE_TO_ONE,
             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
@@ -815,7 +815,7 @@ public class TestDAGVerify {
             OutputDescriptor.create("dummy output class"),
             InputDescriptor.create("dummy input class")),
         InputDescriptor.create("dummy input class"));
-    
+
     dag.addVertex(v1);
     dag.addVertex(v2);
     dag.addVertex(v3);
@@ -826,7 +826,7 @@ public class TestDAGVerify {
     for (int i = 0; i< 10;++i){
       dag.verify();  // should be OK when called multiple times
     }
-    
+
     Assert.assertEquals(dummyTaskCount, v5.getParallelism());
   }
 
@@ -941,8 +941,8 @@ public class TestDAGVerify {
 
     dag.createDag(new TezConfiguration(), null, null, null, true);
   }
-  
-  
+
+
   @Test(timeout = 5000)
   public void testDAGCreateDataInference() {
     Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
@@ -954,7 +954,7 @@ public class TestDAGVerify {
     String lrName2 = "LR2";
     lrs2.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
-    
+
     Set<String> hosts = Sets.newHashSet();
     hosts.add("h1");
     hosts.add("h2");
@@ -962,10 +962,10 @@ public class TestDAGVerify {
     taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
     taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
     VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
-    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"), 
+    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
         InputInitializerDescriptor.create(dummyInputInitClassName), dummyTaskCount, null, vLoc, lrs2);
     v1.addDataSource("i1", ds);
-        
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addTaskLocalFiles(lrs1);
@@ -1003,10 +1003,10 @@ public class TestDAGVerify {
       Assert.assertTrue(e.getMessage().contains("Duplicate Resources found with different size"));
     }
 
-    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"), 
+    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"),
         null, -1, null, null, lrs2);
     v1.addDataSource("i1", ds);
-    
+
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
     dag.addTaskLocalFiles(lrs);
@@ -1024,7 +1024,7 @@ public class TestDAGVerify {
       Assert.assertTrue(e.getMessage().contains("Duplicate Resources found with different size"));
     }
   }
-  
+
   @Test(timeout = 5000)
   public void testDAGAccessControls() {
     DAG dag = DAG.create("testDag");

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 177ba56..2d2f23d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -71,6 +71,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
@@ -298,7 +299,7 @@ public class DAGAppMaster extends AbstractService {
   private Path currentRecoveryDataDir;
   private Path tezSystemStagingDir;
   private FileSystem recoveryFS;
-  
+
   private ExecutorService rawExecutor;
   private ListeningExecutorService execService;
 
@@ -330,7 +331,7 @@ public class DAGAppMaster extends AbstractService {
   private String clientVersion;
   private boolean versionMismatch = false;
   private String versionMismatchDiagnostics;
-  
+
   private ResourceCalculatorProcessTree cpuPlugin;
   private GcTimeUpdater gcPlugin;
 
@@ -385,7 +386,7 @@ public class DAGAppMaster extends AbstractService {
     return PATH_JOINER.join(nodeHttpAddress, "node", "containerlogs",
         containerId, user);
   }
-  
+
   private void initResourceCalculatorPlugins() {
     Class<? extends ResourceCalculatorProcessTree> clazz = amConf.getClass(
         TezConfiguration.TEZ_TASK_RESOURCE_CALCULATOR_PROCESS_TREE_CLASS,
@@ -400,10 +401,10 @@ public class DAGAppMaster extends AbstractService {
       pid = processName.split("@")[0];
     }
     cpuPlugin = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, amConf);
-    
+
     gcPlugin = new GcTimeUpdater(null);
   }
-  
+
   private long getAMCPUTime() {
     if (cpuPlugin != null) {
       cpuPlugin.updateProcessTree();
@@ -557,14 +558,14 @@ public class DAGAppMaster extends AbstractService {
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
     } else {
-      int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY, 
+      int concurrency = conf.getInt(TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY,
           TezConfiguration.TEZ_AM_CONCURRENT_DISPATCHER_CONCURRENCY_DEFAULT);
       AsyncDispatcherConcurrent sharedDispatcher = dispatcher.registerAndCreateDispatcher(
           TaskEventType.class, new TaskEventDispatcher(), "TaskAndAttemptEventThread", concurrency);
       dispatcher.registerWithExistingDispatcher(TaskAttemptEventType.class,
           new TaskAttemptEventDispatcher(), sharedDispatcher);
     }
-    
+
     // register other delegating dispatchers
     dispatcher.registerAndCreateDispatcher(SpeculatorEventType.class, new SpeculatorEventHandler(),
         "Speculator");
@@ -661,7 +662,7 @@ public class DAGAppMaster extends AbstractService {
   protected ContainerSignatureMatcher createContainerSignatureMatcher() {
     return new ContainerContextMatcher();
   }
-  
+
   @VisibleForTesting
   protected AsyncDispatcher createDispatcher() {
     return new AsyncDispatcher("Central");
@@ -680,7 +681,7 @@ public class DAGAppMaster extends AbstractService {
       System.exit(0);
     }
   }
-  
+
   @VisibleForTesting
   protected TaskSchedulerManager getTaskSchedulerManager() {
     return taskSchedulerManager;
@@ -1342,7 +1343,7 @@ public class DAGAppMaster extends AbstractService {
     }
     dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dag.getID(), DAGTerminationCause.DAG_KILL, message));
   }
-  
+
   private Map<String, LocalResource> getAdditionalLocalResourceDiff(
       DAG dag, Map<String, LocalResource> additionalResources) throws TezException {
     if (additionalResources == null) {
@@ -1494,7 +1495,7 @@ public class DAGAppMaster extends AbstractService {
     public DAG getCurrentDAG() {
       return dag;
     }
-    
+
     @Override
     public ListeningExecutorService getExecService() {
       return execService;
@@ -1695,7 +1696,7 @@ public class DAGAppMaster extends AbstractService {
     public long getCumulativeCPUTime() {
       return getAMCPUTime();
     }
-    
+
     @Override
     public long getCumulativeGCTime() {
       return getAMGCTime();
@@ -1928,7 +1929,7 @@ public class DAGAppMaster extends AbstractService {
     }
     return null;
   }
-  
+
   @Override
   public synchronized void serviceStart() throws Exception {
 
@@ -1971,8 +1972,19 @@ public class DAGAppMaster extends AbstractService {
       return;
     }
 
+    DAGPlan dagPlan = null;
     if (!isSession) {
       LOG.info("In Non-Session mode.");
+      dagPlan = readDAGPlanFile();
+      if (hasConcurrentEdge(dagPlan)) {
+        // Currently a DAG with concurrent edge is deemed unrecoverable
+        // (run from scratch) on AM failover. Proper AM failover for DAG with
+        // concurrent edge is pending TEZ-4017
+        if (recoveredDAGData != null) {
+          LOG.warn("Ignoring recoveredDAGData for a recovered DAG with concurrent edge.");
+          recoveredDAGData = null;
+        }
+      }
     } else {
       LOG.info("In Session mode. Waiting for DAG over RPC");
       this.state = DAGAppMasterState.IDLE;
@@ -2053,7 +2065,8 @@ public class DAGAppMaster extends AbstractService {
       if (!isSession) {
         // No dag recovered - in non-session, just restart the original DAG
         dagCounter.set(0);
-        startDAG();
+        assert(dagPlan != null);
+        startDAG(dagPlan, null);
       }
     }
 
@@ -2181,7 +2194,7 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(TaskEvent event) {
       DAG dag = context.getCurrentDAG();
-      int eventDagIndex = 
+      int eventDagIndex =
           event.getTaskID().getVertexID().getDAGId().getId();
       if (dag == null || eventDagIndex != dag.getID().getId()) {
         return; // event not relevant any more
@@ -2192,7 +2205,7 @@ public class DAGAppMaster extends AbstractService {
       ((EventHandler<TaskEvent>)task).handle(event);
     }
   }
-  
+
   private class SpeculatorEventHandler implements EventHandler<SpeculatorEvent> {
     @Override
     public void handle(SpeculatorEvent event) {
@@ -2211,7 +2224,7 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(TaskAttemptEvent event) {
       DAG dag = context.getCurrentDAG();
-      int eventDagIndex = 
+      int eventDagIndex =
           event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
       if (dag == null || eventDagIndex != dag.getID().getId()) {
         return; // event not relevant any more
@@ -2230,12 +2243,12 @@ public class DAGAppMaster extends AbstractService {
     @Override
     public void handle(VertexEvent event) {
       DAG dag = context.getCurrentDAG();
-      int eventDagIndex = 
+      int eventDagIndex =
           event.getVertexId().getDAGId().getId();
       if (dag == null || eventDagIndex != dag.getID().getId()) {
         return; // event not relevant any more
       }
-      
+
       Vertex vertex =
           dag.getVertex(event.getVertexId());
       ((EventHandler<VertexEvent>) vertex).handle(event);
@@ -2440,23 +2453,30 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private void startDAG() throws IOException, TezException {
+  private boolean hasConcurrentEdge(DAGPlan dagPlan) {
+    boolean hasConcurrentEdge = false;
+    for (DAGProtos.EdgePlan edge : dagPlan.getEdgeList()) {
+      if (DAGProtos.PlanEdgeSchedulingType.CONCURRENT.equals(edge.getSchedulingType())) {
+        return true;
+      }
+    }
+    return hasConcurrentEdge;
+  }
+
+  private DAGPlan readDAGPlanFile() throws IOException, TezException {
     FileInputStream dagPBBinaryStream = null;
+    DAGPlan dagPlan = null;
     try {
-      DAGPlan dagPlan = null;
-
       // Read the protobuf DAG
       dagPBBinaryStream = new FileInputStream(new File(workingDirectory,
           TezConstants.TEZ_PB_PLAN_BINARY_NAME));
       dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
-
-      startDAG(dagPlan, null);
-
     } finally {
       if (dagPBBinaryStream != null) {
         dagPBBinaryStream.close();
       }
     }
+    return dagPlan;
   }
 
   private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMResources)

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
new file mode 100644
index 0000000..db6bb5a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/edgemanager/SilentEdgeManager.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.edgemanager;
+
+import org.apache.tez.dag.api.EdgeManagerPlugin;
+import org.apache.tez.dag.api.EdgeManagerPluginContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A dummy edge manager used in scenarios where application will depend on
+ * the direct connection between containers/tasks to handle all data communications,
+ * including both routing and actual data transfers.
+ */
+
+public class SilentEdgeManager extends EdgeManagerPlugin {
+
+  /**
+   * Create an instance of the EdgeManagerPlugin. Classes extending this to
+   * create a EdgeManagerPlugin, must provide the same constructor so that Tez
+   * can create an instance of the class at runtime.
+   *
+   * @param context the context within which this EdgeManagerPlugin will run. Includes
+   *                information like configuration which the user may have specified
+   *                while setting up the edge.
+   */
+  public SilentEdgeManager(EdgeManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() throws Exception {
+
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public void routeDataMovementEventToDestination(
+      DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex,
+      Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
+    throw new UnsupportedOperationException(
+        "routeDataMovementEventToDestination not supported for SilentEdgeManager");
+  }
+
+  @Override
+  public void routeInputSourceTaskFailedEventToDestination(
+      int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
+    throw new UnsupportedOperationException(
+        "routeInputSourceTaskFailedEventToDestination not supported for SilentEdgeManager");
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
new file mode 100644
index 0000000..caf5acd
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
+
+public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
+
+  private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
+
+  private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
+  private int managedTasks;
+  private AtomicBoolean tasksScheduled = new AtomicBoolean(false);
+  private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
+  private Configuration vertexConfig;
+  private String vertexName;
+  private ConcurrentEdgeTriggerType edgeTriggerType;
+  private volatile boolean allSrcVerticesConfigured;
+
+  int completedUpstreamTasks;
+
+  public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
+    super(context);
+  }
+
+  @Override
+  public void initialize() {
+    UserPayload userPayload = getContext().getUserPayload();
+    if (userPayload == null || userPayload.getPayload() == null ||
+        userPayload.getPayload().limit() == 0) {
+      throw new TezUncheckedException("Could not initialize VertexManagerWithConcurrentInput"
+          + " from provided user payload");
+    }
+    managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
+      if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
+        throw new TezUncheckedException("All input edges to vertex " + vertexName +
+            "  must be CONCURRENT.");
+      }
+      String srcVertex = entry.getKey();
+      srcVerticesConfigured.put(srcVertex, false);
+      getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
+    }
+
+    try {
+      vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
+        vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
+            TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
+    if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType)) {
+      // pending TEZ-3999
+      throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
+    }
+    LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", edgeTriggerType);
+
+    vertexName = getContext().getVertexName();
+    completedUpstreamTasks = 0;
+  }
+
+  @Override
+  public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
+    onVertexStartedDone.set(true);
+    scheduleTasks();
+  }
+
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+    VertexState state = stateUpdate.getVertexState();
+    String fromVertex = stateUpdate.getVertexName();
+    if (!srcVerticesConfigured.containsKey(fromVertex)) {
+      throw new IllegalArgumentException("Not expecting state update from vertex:" +
+          fromVertex + " in vertex: " + this.vertexName);
+    }
+
+    if (!VertexState.CONFIGURED.equals(state)) {
+      throw new IllegalArgumentException("Received incorrect state notification : " +
+          state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName);
+    }
+
+    LOG.info("Received configured notification: " + state + " for vertex: "
+        + fromVertex + " in vertex: " + this.vertexName);
+    srcVerticesConfigured.put(fromVertex, true);
+
+    // check for source vertices completely configured
+    boolean checkAllSrcVerticesConfigured = true;
+    for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) {
+      if (!entry.getValue()) {
+        // vertex not configured
+        LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName);
+        checkAllSrcVerticesConfigured = false;
+        break;
+      }
+    }
+    allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
+
+    scheduleTasks();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+    completedUpstreamTasks ++;
+    LOG.info("Source task attempt {} completion received at vertex {}", attempt, this.vertexName);
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
+  @Override
+  public void onRootVertexInitialized(String inputName,
+                                      InputDescriptor inputDescriptor, List<Event> events) {
+  }
+
+  private void scheduleTasks() {
+    if (!onVertexStartedDone.get()) {
+      // vertex not started yet
+      return;
+    }
+    if (tasksScheduled.get()) {
+      // already scheduled
+      return;
+    }
+
+    if (!canScheduleTasks()) {
+      return;
+    }
+
+    tasksScheduled.compareAndSet(false, true);
+    List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
+    for (int i = 0; i < managedTasks; ++i) {
+      tasksToStart.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
+    }
+
+    if (!tasksToStart.isEmpty()) {
+      LOG.info("Starting {} tasks in {}.", tasksToStart.size(), this.vertexName);
+      getContext().scheduleTasks(tasksToStart);
+    }
+    // all tasks scheduled. Can call vertexManagerDone().
+  }
+
+  private boolean canScheduleTasks() {
+    if (edgeTriggerType.equals(ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED)) {
+      return allSrcVerticesConfigured;
+    } else {
+      // pending TEZ-3999
+      throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
+    }
+  }
+
+
+  /**
+   * Create a {@link VertexManagerPluginDescriptor} builder that can be used to
+   * configure the plugin.
+   *
+   * @param conf
+   *          {@link Configuration} May be modified in place. May be null if the
+   *          configuration parameters are to be set only via code. If
+   *          configuration values may be changed at runtime via a config file
+   *          then pass in a {@link Configuration} that is initialized from a
+   *          config file. The parameters that are not overridden in code will
+   *          be derived from the Configuration object.
+   * @return {@link ConcurrentInputVertexManagerConfigBuilder}
+   */
+  public static ConcurrentInputVertexManagerConfigBuilder createConfigBuilder(
+      @Nullable Configuration conf) {
+    return new ConcurrentInputVertexManagerConfigBuilder(conf);
+  }
+
+  /**
+   * Helper class to configure VertexManagerWithConcurrentInput
+   */
+  public static final class ConcurrentInputVertexManagerConfigBuilder {
+    private final Configuration conf;
+
+    private ConcurrentInputVertexManagerConfigBuilder(@Nullable Configuration conf) {
+      if (conf == null) {
+        this.conf = new Configuration(false);
+      } else {
+        this.conf = conf;
+      }
+    }
+
+    public VertexManagerPluginDescriptor build() {
+      VertexManagerPluginDescriptor desc =
+          VertexManagerPluginDescriptor.create(
+              VertexManagerWithConcurrentInput.class.getName());
+
+      try {
+        return desc.setUserPayload(TezUtils
+            .createUserPayloadFromConf(this.conf));
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/e6722a96/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
new file mode 100644
index 0000000..619a4cd
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestVertexManagerWithConcurrentInput.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.edgemanager.SilentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestVertexManagerWithConcurrentInput {
+
+  @Captor
+  ArgumentCaptor<List<VertexManagerPluginContext.ScheduleTaskRequest>> requestCaptor;
+
+  @Before
+  public void init() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test(timeout = 5000)
+  public void testBasicVertexWithConcurrentInput() throws Exception {
+    HashMap<String, EdgeProperty> mockInputVertices =
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    int srcVertex1Parallelism = 2;
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeManagerPluginDescriptor.create(SilentEdgeManager.class.getName()),
+        EdgeProperty.DataSourceType.EPHEMERAL,
+        EdgeProperty.SchedulingType.CONCURRENT,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    String mockSrcVertexId2 = "Vertex2";
+    int srcVertex2Parallelism = 3;
+    EdgeProperty eProp2 = EdgeProperty.create(
+        EdgeManagerPluginDescriptor.create(SilentEdgeManager.class.getName()),
+        EdgeProperty.DataSourceType.EPHEMERAL,
+        EdgeProperty.SchedulingType.CONCURRENT,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+
+    String mockManagedVertexId = "Vertex";
+    int vertexParallelism = 2;
+
+    VertexManagerWithConcurrentInput.ConcurrentInputVertexManagerConfigBuilder configurer =
+        VertexManagerWithConcurrentInput.createConfigBuilder(null);
+    VertexManagerPluginDescriptor pluginDesc = configurer.build();
+
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(vertexParallelism);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(srcVertex1Parallelism);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(srcVertex2Parallelism);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    mockInputVertices.put(mockSrcVertexId2, eProp2);
+
+    VertexManagerWithConcurrentInput manager = new VertexManagerWithConcurrentInput(mockContext);
+    when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+    manager.initialize();
+    when(mockContext.getUserPayload()).thenReturn(pluginDesc.getUserPayload());
+
+    // source vertex 1 configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+
+    // source vertex 2 configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+
+    // then own vertex started
+    manager.onVertexStarted(Collections.singletonList(
+        TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+    verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
+    Assert.assertEquals(0, manager.completedUpstreamTasks);
+  }
+}