You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/11/12 06:03:38 UTC

git commit: TEZ-608. Fix 1-1 edge connection when parallelism is determined at runtime (bikas)

Updated Branches:
  refs/heads/master 18540ca92 -> 8aac5ba45


TEZ-608. Fix 1-1 edge connection when parallelism is determined at runtime (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8aac5ba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8aac5ba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8aac5ba4

Branch: refs/heads/master
Commit: 8aac5ba45c6f1a786659410bbf217328f777f719
Parents: 18540ca
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Nov 11 21:00:20 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Nov 11 21:00:20 2013 -0800

----------------------------------------------------------------------
 .../event/VertexEventOneToOneSourceSplit.java   |  50 +++
 .../tez/dag/app/dag/event/VertexEventType.java  |   1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  28 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 366 ++++++++++++-------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 255 ++++++++++---
 .../tez/mapreduce/examples/ExampleDriver.java   |   5 +-
 .../examples/FilterLinesByWordOneToOne.java     | 275 ++++++++++++++
 .../mapreduce/examples/OrderedWordCount.java    |   1 -
 .../processor/FilterByWordInputProcessor.java   |   4 +-
 9 files changed, 795 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
new file mode 100644
index 0000000..a7e580e
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventOneToOneSourceSplit.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezVertexID;
+
+public class VertexEventOneToOneSourceSplit extends VertexEvent {
+  final int numTasks;
+  final TezVertexID originalSplitVertex;
+  final TezVertexID senderVertex;
+  
+  public VertexEventOneToOneSourceSplit(TezVertexID vertexId,
+      TezVertexID senderVertex,
+      TezVertexID originalSplitVertex,
+      int numTasks) {
+    super(vertexId, VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT);
+    this.numTasks = numTasks;
+    this.senderVertex = senderVertex;
+    this.originalSplitVertex = originalSplitVertex;
+  }
+  
+  public int getNumTasks() {
+    return numTasks;
+  }
+  
+  public TezVertexID getOriginalSplitSource() {
+    return originalSplitVertex;
+  }
+  
+  public TezVertexID getSenderVertex() {
+    return senderVertex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
index 9d62ede..fccfe91 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventType.java
@@ -54,6 +54,7 @@ public enum VertexEventType {
   V_COUNTER_UPDATE,
   
   V_ROUTE_EVENT,
+  V_ONE_TO_ONE_SOURCE_SPLIT,
   
   //Producer: VertexInputInitializer
   V_ROOT_INPUT_INITIALIZED,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index d16086b..ee6b832 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -557,23 +557,24 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return vertex.getVertexStatus(statusOptions);
   }
 
-
-  protected void startRootVertices() {
+  protected void initializeVerticesAndStart() {
     for (Vertex v : vertices.values()) {
       if (v.getInputVerticesCount() == 0) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Starting root vertex " + v.getName());
+          LOG.debug("Initing root vertex " + v.getName());
         }
         eventHandler.handle(new VertexEvent(v.getVertexId(),
-            VertexEventType.V_START));
+            VertexEventType.V_INIT));
       }
     }
-  }
-
-  protected void initializeVertices() {
     for (Vertex v : vertices.values()) {
-      eventHandler.handle(new VertexEvent(v.getVertexId(),
-          VertexEventType.V_INIT));
+      if (v.getInputVerticesCount() == 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Starting root vertex " + v.getName());
+        }
+        eventHandler.handle(new VertexEvent(v.getVertexId(),
+            VertexEventType.V_START));
+      }
     }
   }
 
@@ -986,15 +987,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
      * triggered in MRAppMaster's startJobs() method.
      */
     @Override
-    public void transition(DAGImpl job, DAGEvent event) {
-      job.startTime = job.clock.getTime();
-      job.initializeVertices();
-      job.logJobHistoryInitedEvent();
+    public void transition(DAGImpl dag, DAGEvent event) {
+      dag.startTime = dag.clock.getTime();
+      dag.logJobHistoryInitedEvent();
       // TODO Metrics
       //job.metrics.runningJob(job);
 
       // Start all vertices with no incoming edges when job starts
-      job.startRootVertices();
+      dag.initializeVerticesAndStart();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index e0a4ecf..cce2043 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -108,6 +108,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventOneToOneSourceSplit;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
 import org.apache.tez.dag.history.events.VertexStartedEvent;
@@ -173,10 +174,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   //fields initialized in init
 
   private int numStartedSourceVertices = 0;
+  private int numInitedSourceVertices = 0;
   private int distanceFromRoot = 0;
 
   private final List<String> diagnostics = new ArrayList<String>();
-
+  
   //task/attempt related datastructures
   @VisibleForTesting
   int numSuccessSourceAttemptCompletions = 0;
@@ -204,8 +206,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Transitions from NEW state
           .addTransition
               (VertexState.NEW,
-              EnumSet.of(VertexState.INITED, VertexState.INITIALIZING,
-                  VertexState.FAILED),
+              EnumSet.of(VertexState.NEW, VertexState.INITED, 
+                  VertexState.INITIALIZING, VertexState.FAILED),
               VertexEventType.V_INIT,
               new InitTransition())
           .addTransition(VertexState.NEW, VertexState.KILLED,
@@ -217,15 +219,20 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
           // Transitions from INITIALIZING state
           .addTransition(VertexState.INITIALIZING,
-              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, VertexState.RUNNING),
+              EnumSet.of(VertexState.INITIALIZING, VertexState.INITED, 
+                  VertexState.RUNNING, VertexState.FAILED),
               VertexEventType.V_ROOT_INPUT_INITIALIZED,
               new RootInputInitializedTransition())
+          .addTransition(VertexState.INITIALIZING, 
+              EnumSet.of(VertexState.FAILED, VertexState.INITED),
+              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+              new OneToOneSourceSplitTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.FAILED,
               VertexEventType.V_ROOT_INPUT_FAILED,
               new RootInputInitFailedTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
               VertexEventType.V_START,
-              new StartWhileInitingTransition())
+              new StartWhileInitializingTransition())
           .addTransition(VertexState.INITIALIZING, VertexState.INITIALIZING,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
@@ -243,10 +250,15 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from INITED state
-              // SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must complete before this vertex can start.
+          // SOURCE_VERTEX_STARTED - for sources which determine parallelism, 
+          // they must complete before this vertex can start.
           .addTransition(VertexState.INITED, VertexState.INITED,
               VertexEventType.V_SOURCE_VERTEX_STARTED,
               new SourceVertexStartedTransition())
+          .addTransition(VertexState.INITED, 
+              EnumSet.of(VertexState.INITED),
+              VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
+              new OneToOneSourceSplitTransition())
           .addTransition(VertexState.INITED,  VertexState.INITED,
               VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
               SOURCE_TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION)
@@ -346,9 +358,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_RESCHEDULED,
+                  VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
+                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_FAILED))
@@ -361,11 +375,13 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.KILLED, VertexState.KILLED,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_INIT,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_START,
                   VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_ROOT_INPUT_INITIALIZED,
@@ -382,6 +398,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                   VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_ONE_TO_ONE_SOURCE_SPLIT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_INTERNAL_ERROR,
@@ -437,6 +454,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private VertexScheduler vertexScheduler;
 
   private boolean parallelismSet = false;
+  private TezVertexID originalOneToOneSplitSource = null;
 
   private VertexOutputCommitter committer;
   private AtomicBoolean committed = new AtomicBoolean(false);
@@ -788,89 +806,105 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
                 "SourceEdge managers cannot be set when determining initial parallelism");
         this.numTasks = parallelism;
         this.createTasks();
-        LOG.info("Parallelism set to : " + this.numTasks);
+        LOG.info("Vertex " + getVertexId() + 
+            " parallelism set to " + parallelism);
         // Pending task event management, which follows, is not required.
         // Vertex event buffering is happening elsewhere - while in the Vertex
         // INITIALIZING state.
-        return;
-      }
-
-      if (parallelism >= numTasks) {
-        // not that hard to support perhaps. but checking right now since there
-        // is no use case for it and checking may catch other bugs.
-        throw new TezUncheckedException(
-            "Increasing parallelism is not supported");
-      }
-      if (parallelism == numTasks) {
-        LOG.info("Ingoring setParallelism to current value: " + parallelism);
-        return;
-      }
-
-      // start buffering incoming events so that we can re-route existing events
-      for (Edge edge : sourceVertices.values()) {
-        edge.startEventBuffering();
-      }
-
-      // Use a set since the same event may have been sent to multiple tasks
-      // and we want to avoid duplicates
-      Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
-
-      LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism);
-      // assign to local variable of LinkedHashMap to make sure that changing
-      // type of task causes compile error. We depend on LinkedHashMap for order
-      LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
-      Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
-          .iterator();
-      int i = 0;
-      while (iter.hasNext()) {
-        i++;
-        Map.Entry<TezTaskID, Task> entry = iter.next();
-        Task task = entry.getValue();
-        if (task.getState() != TaskState.NEW) {
+      } else {
+        if (parallelism >= numTasks) {
+          // not that hard to support perhaps. but checking right now since there
+          // is no use case for it and checking may catch other bugs.
           throw new TezUncheckedException(
-              "All tasks must be in initial state when changing parallelism"
-                  + " for vertex: " + getVertexId() + " name: " + getName());
+              "Increasing parallelism is not supported");
         }
-        pendingEvents.addAll(task.getAndClearTaskTezEvents());
-        if (i <= parallelism) {
-          continue;
+        if (parallelism == numTasks) {
+          LOG.info("Ingoring setParallelism to current value: " + parallelism);
+          return;
         }
-        LOG.info("Removing task: " + entry.getKey());
-        iter.remove();
-      }
-      this.numTasks = parallelism;
-      assert tasks.size() == numTasks;
-
-      // set new edge managers
-      if(sourceEdgeManagers != null) {
-        for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
-          Vertex sourceVertex = entry.getKey();
-          EdgeManager edgeManager = entry.getValue();
-          Edge edge = sourceVertices.get(sourceVertex);
-          LOG.info("Replacing edge manager for source:"
-              + sourceVertex.getVertexId() + " destination: " + getVertexId());
-          edge.setEdgeManager(edgeManager);
+  
+        // start buffering incoming events so that we can re-route existing events
+        for (Edge edge : sourceVertices.values()) {
+          edge.startEventBuffering();
+        }
+  
+        // Use a set since the same event may have been sent to multiple tasks
+        // and we want to avoid duplicates
+        Set<TezEvent> pendingEvents = new HashSet<TezEvent>();
+  
+        LOG.info("Vertex " + getVertexId() + 
+            " parallelism set to " + parallelism + " from " + numTasks);
+        // assign to local variable of LinkedHashMap to make sure that changing
+        // type of task causes compile error. We depend on LinkedHashMap for order
+        LinkedHashMap<TezTaskID, Task> currentTasks = this.tasks;
+        Iterator<Map.Entry<TezTaskID, Task>> iter = currentTasks.entrySet()
+            .iterator();
+        int i = 0;
+        while (iter.hasNext()) {
+          i++;
+          Map.Entry<TezTaskID, Task> entry = iter.next();
+          Task task = entry.getValue();
+          if (task.getState() != TaskState.NEW) {
+            throw new TezUncheckedException(
+                "All tasks must be in initial state when changing parallelism"
+                    + " for vertex: " + getVertexId() + " name: " + getName());
+          }
+          pendingEvents.addAll(task.getAndClearTaskTezEvents());
+          if (i <= parallelism) {
+            continue;
+          }
+          LOG.info("Removing task: " + entry.getKey());
+          iter.remove();
+        }
+        this.numTasks = parallelism;
+        assert tasks.size() == numTasks;
+  
+        // set new edge managers
+        if(sourceEdgeManagers != null) {
+          for(Map.Entry<Vertex, EdgeManager> entry : sourceEdgeManagers.entrySet()) {
+            Vertex sourceVertex = entry.getKey();
+            EdgeManager edgeManager = entry.getValue();
+            Edge edge = sourceVertices.get(sourceVertex);
+            LOG.info("Replacing edge manager for source:"
+                + sourceVertex.getVertexId() + " destination: " + getVertexId());
+            edge.setEdgeManager(edgeManager);
+          }
+        }
+  
+        // Re-route all existing TezEvents according to new routing table
+        // At this point only events attributed to source task attempts can be
+        // re-routed. e.g. DataMovement or InputFailed events.
+        // This assumption is fine for now since these tasks haven't been started.
+        // So they can only get events generated from source task attempts that
+        // have already been started.
+        DAG dag = getDAG();
+        for(TezEvent event : pendingEvents) {
+          TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
+              .getTaskID().getVertexID();
+          Vertex sourceVertex = dag.getVertex(sourceVertexId);
+          Edge sourceEdge = sourceVertices.get(sourceVertex);
+          sourceEdge.sendTezEventToDestinationTasks(event);
+        }
+  
+        // stop buffering events
+        for (Edge edge : sourceVertices.values()) {
+          edge.stopEventBuffering();
         }
       }
-
-      // Re-route all existing TezEvents according to new routing table
-      // At this point only events attributed to source task attempts can be
-      // re-routed. e.g. DataMovement or InputFailed events.
-      // This assumption is fine for now since these tasks haven't been started.
-      // So they can only get events generated from source task attempts that
-      // have already been started.
-      DAG dag = getDAG();
-      for(TezEvent event : pendingEvents) {
-        TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID()
-            .getTaskID().getVertexID();
-        Vertex sourceVertex = dag.getVertex(sourceVertexId);
-        Edge sourceEdge = sourceVertices.get(sourceVertex);
-        sourceEdge.sendTezEventToDestinationTasks(event);
-      }
-
-      // stop buffering events
-      for (Edge edge : sourceVertices.values()) {
-        edge.stopEventBuffering();
+      
+      for (Map.Entry<Vertex, Edge> entry : targetVertices.entrySet()) {
+        Edge edge = entry.getValue();
+        if (edge.getEdgeProperty().getDataMovementType() 
+            == DataMovementType.ONE_TO_ONE) {
+          // inform these target vertices that we have changed parallelism
+          VertexEventOneToOneSourceSplit event = 
+              new VertexEventOneToOneSourceSplit(entry.getKey().getVertexId(),
+                  getVertexId(),
+                  ((originalOneToOneSplitSource!=null) ? 
+                      originalOneToOneSplitSource : getVertexId()), 
+                  numTasks);
+          getEventHandler().handle(event);
+        }
       }
 
     } finally {
@@ -1192,7 +1226,24 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexState vertexState = VertexState.NEW;
+      vertex.numInitedSourceVertices++;
+      if (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty() ||
+          vertex.numInitedSourceVertices == vertex.sourceVertices.size()) {
+        vertexState = handleInitEvent(vertex, event);
+        if (vertexState != VertexState.FAILED) {
+          if (vertex.targetVertices != null && !vertex.targetVertices.isEmpty()) {
+            for (Vertex target : vertex.targetVertices.keySet()) {
+              vertex.getEventHandler().handle(new VertexEvent(target.getVertexId(),
+                VertexEventType.V_INIT));
+            }
+          }
+        }
+      }
+      return vertexState;
+    }
 
+    private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
       vertex.initTimeRequested = vertex.clock.getTime();
 
       // VertexManager needs to be setup before attempting to Initialize any
@@ -1269,40 +1320,59 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       if (vertex.numTasks == -1) {
         LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers"
             + " to set #tasks for the vertex " + vertex.getVertexId());
-      } else {
-        vertex.createTasks();
-      }
 
-      if (vertex.inputsWithInitializers != null) {
-        // Use DAGScheduler to arbitrate resources among vertices later
-        // Ask for 1.5 the number of tasks we can fit in one wave
-        int totalResource = vertex.appContext.getTaskScheduler()
-            .getTotalResources().getMemory();
-        int taskResource = vertex.getTaskResource().getMemory();
-        float waves = vertex.conf.getFloat(
-            TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
-            TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
-
-        int numTasks = (int)((totalResource*waves)/taskResource);
-
-        LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
-            + " tasks. Headroom: " + totalResource + " Task Resource: "
-            + taskResource + " waves: " + waves);
-
-        vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
-            vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
-            vertex.eventHandler, numTasks);
-        List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
-            .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
-        for (String inputName : vertex.inputsWithInitializers) {
-          inputList.add(vertex.additionalInputs.get(inputName));
+        if (vertex.inputsWithInitializers != null) {
+          // Use DAGScheduler to arbitrate resources among vertices later
+          // Ask for 1.5 the number of tasks we can fit in one wave
+          int totalResource = vertex.appContext.getTaskScheduler()
+              .getTotalResources().getMemory();
+          int taskResource = vertex.getTaskResource().getMemory();
+          float waves = vertex.conf.getFloat(
+              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
+              TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+
+          int numTasks = (int)((totalResource*waves)/taskResource);
+
+          LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks
+              + " tasks. Headroom: " + totalResource + " Task Resource: "
+              + taskResource + " waves: " + waves);
+
+          vertex.rootInputInitializer = vertex.createRootInputInitializerRunner(
+              vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
+              vertex.eventHandler, numTasks);
+          List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
+              .newArrayListWithCapacity(vertex.inputsWithInitializers.size());
+          for (String inputName : vertex.inputsWithInitializers) {
+            inputList.add(vertex.additionalInputs.get(inputName));
+          }
+          LOG.info("Starting root input initializers: "
+              + vertex.inputsWithInitializers.size());
+          vertex.rootInputInitializer.runInputInitializers(inputList);
+        } else {
+          // no input initializers. At this moment, only other case is 1-1 edge
+          // with uninitialized sources
+          boolean hasOneToOneUninitedSource = false;
+          for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
+            if (entry.getValue().getEdgeProperty().getDataMovementType() == 
+                DataMovementType.ONE_TO_ONE) {
+              if (entry.getKey().getTotalTasks() == -1) {
+                hasOneToOneUninitedSource = true;
+                break;
+              }
+            }
+          }
+          if (!hasOneToOneUninitedSource) {
+            throw new TezUncheckedException(vertex.getVertexId() + 
+            " has -1 tasks but neither input initializers nor 1-1 uninited sources");
+          }
         }
-        LOG.info("Starting root input initializers: "
-            + vertex.inputsWithInitializers.size());
-        vertex.rootInputInitializer.runInputInitializers(inputList);
+                
         return VertexState.INITIALIZING;
+      } else {
+        vertex.createTasks();
       }
 
+
       return vertex.initializeVertex();
     }
   } // end of InitTransition
@@ -1314,6 +1384,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return new RootInputInitializerRunner(dagName, vertexName, vertexID,
         eventHandler, numTasks);
   }
+  
+  private VertexState initializeVertexInInitializingState() {
+    VertexState vertexState = initializeVertex();
+    if (vertexState == VertexState.FAILED) {
+      // Don't bother starting if the vertex state is failed.
+      return vertexState;
+    }
+
+    // Vertex will be moving to INITED state, safe to process pending route events.
+    if (pendingRouteEvents != null) {
+      VertexImpl.ROUTE_EVENT_TRANSITION.transition(this,
+          new VertexEventRouteEvent(getVertexId(), pendingRouteEvents));
+      pendingRouteEvents = null;
+    }
+    return vertexState;
+  }
 
   public static class RootInputInitializedTransition implements
       MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
@@ -1334,18 +1420,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         // If RootInputs are determining parallelism, it should have been set by
         // this point, so it's safe to checkTaskLimits and createTasks
-        VertexState vertexState = vertex.initializeVertex();
+        VertexState vertexState = vertex.initializeVertexInInitializingState();
         if (vertexState == VertexState.FAILED) {
-          // Don't bother starting if the vertex state is failed.
-          return vertexState;
-        }
-
-        // Vertex will be moving to INITED state, safe to process pending route events.
-        if (vertex.pendingRouteEvents != null) {
-          VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
-              new VertexEventRouteEvent(vertex.getVertexId(),
-                  vertex.pendingRouteEvents));
-          vertex.pendingRouteEvents = null;
+          return VertexState.FAILED;
         }
 
         if (vertex.startSignalPending) {
@@ -1360,6 +1437,50 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
+  public static class OneToOneSourceSplitTransition implements
+    MultipleArcTransition<VertexImpl, VertexEvent, VertexState> {
+
+    @Override
+    public VertexState transition(VertexImpl vertex, VertexEvent event) {
+      VertexEventOneToOneSourceSplit splitEvent = 
+          (VertexEventOneToOneSourceSplit)event;
+      TezVertexID originalSplitSource = splitEvent.getOriginalSplitSource();
+      if (vertex.originalOneToOneSplitSource != null) {
+        Preconditions.checkState(vertex.getState() == VertexState.INITED, 
+            " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
+            " in state " + vertex.getState() + 
+            " . Split in vertex " + originalSplitSource + 
+            " sent by vertex " + splitEvent.getSenderVertex() +
+            " numTasks " + splitEvent.getNumTasks());
+        if (vertex.originalOneToOneSplitSource.equals(originalSplitSource)) {
+          // ignore another split event that may have come from a different
+          // path in the DAG. We have already split because of that source
+          LOG.info("Ignoring split of vertex " + vertex.getVertexId() + 
+              " because of split in vertex " + originalSplitSource + 
+              " sent by vertex " + splitEvent.getSenderVertex() +
+              " numTasks " + splitEvent.getNumTasks());
+          return VertexState.INITED;
+        }
+        // cannot split from multiple sources
+        throw new TezUncheckedException("Vertex: " + vertex.getVertexId() + 
+            " asked to split by: " + originalSplitSource + 
+            " but was already split by:" + vertex.originalOneToOneSplitSource);
+      }
+      Preconditions.checkState(vertex.getState() == VertexState.INITIALIZING, 
+          " Unexpected 1-1 split for vertex " + vertex.getVertexId() + 
+          " in state " + vertex.getState() + 
+          " . Split in vertex " + originalSplitSource + 
+          " sent by vertex " + splitEvent.getSenderVertex() +
+          " numTasks " + splitEvent.getNumTasks());
+      LOG.info("Splitting vertex " + vertex.getVertexId() + 
+          " because of split in vertex " + originalSplitSource + 
+          " sent by vertex " + splitEvent.getSenderVertex() +
+          " numTasks " + splitEvent.getNumTasks());
+      vertex.originalOneToOneSplitSource = originalSplitSource;
+      vertex.setParallelism(splitEvent.getNumTasks(), null);
+      return vertex.initializeVertexInInitializingState();
+    }
+  }
 
   // Temporary to maintain topological order while starting vertices. Not useful
   // since there's not much difference between the INIT and RUNNING states.
@@ -1386,7 +1507,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     }
   }
 
-  public static class StartWhileInitingTransition implements SingleArcTransition<VertexImpl, VertexEvent> {
+  public static class StartWhileInitializingTransition implements 
+    SingleArcTransition<VertexImpl, VertexEvent> {
 
     @Override
     public void transition(VertexImpl vertex, VertexEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 4a479e3..abb0d53 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -319,6 +319,140 @@ public class TestVertexImpl {
     .build();
     return dag;
   }
+  
+  private DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName) {
+    LOG.info("Setting up one to one dag plan");
+    DAGPlan dag = DAGPlan.newBuilder()
+        .setName("testVertexOneToOneSplit")
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex1")
+                .setType(PlanVertexType.NORMAL)
+                .addInputs(
+                    RootInputLeafOutputProto.newBuilder()
+                    .setInitializerClassName(initializerClassName)
+                    .setName("input1")
+                    .setEntityDescriptor(
+                        TezEntityDescriptorProto.newBuilder()
+                            .setClassName("InputClazz")
+                            .build()
+                    ).build()
+                )
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x1.y1")
+                    .build()
+                )
+                .addOutEdgeId("e1")
+                .addOutEdgeId("e2")
+            .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex2")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x2.y2")
+                    .build()
+                )
+                .addInEdgeId("e1")
+                .addOutEdgeId("e3")
+            .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex3")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x3.y3")
+                    .build()
+                )
+                .addInEdgeId("e2")
+                .addOutEdgeId("e4")
+            .build()
+        )
+        .addVertex(
+            VertexPlan.newBuilder()
+                .setName("vertex4")
+                .setType(PlanVertexType.NORMAL)
+                .setTaskConfig(
+                    PlanTaskConfiguration.newBuilder()
+                    .setNumTasks(-1)
+                    .setVirtualCores(4)
+                    .setMemoryMb(1024)
+                    .setJavaOpts("")
+                    .setTaskModule("x4.y4")
+                    .build()
+                )
+                .addInEdgeId("e3")
+                .addInEdgeId("e4")
+            .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex2")
+                .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+                .setId("e1")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v3"))
+                .setInputVertexName("vertex1")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex3")
+                .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+                .setId("e2")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v2_v4"))
+                .setInputVertexName("vertex2")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex4")
+                .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+                .setId("e3")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+        .addEdge(
+            EdgePlan.newBuilder()
+                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v3_v4"))
+                .setInputVertexName("vertex3")
+                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
+                .setOutputVertexName("vertex4")
+                .setDataMovementType(PlanEdgeDataMovementType.ONE_TO_ONE)
+                .setId("e4")
+                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
+                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
+                .build()
+        )
+    .build();
+    return dag;
+  }
 
   private DAGPlan createTestDAGPlan() {
     LOG.info("Setting up dag plan");
@@ -680,10 +814,16 @@ public class TestVertexImpl {
     this.vertexIdMap = null;
   }
 
-  private void initAllVertices() {
-    for (int i = 1; i <= 6; ++i) {
+  private void initAllVertices(VertexState expectedState) {
+    for (int i = 1; i <= vertices.size(); ++i) {
+      VertexImpl v = vertices.get("vertex" + i);
+      if (v.sourceVertices == null || v.sourceVertices.isEmpty()) {
+        initVertex(v);
+      }
+    }
+    for (int i = 1; i <= vertices.size(); ++i) {
       VertexImpl v = vertices.get("vertex" + i);
-      initVertex(v);
+      Assert.assertEquals(expectedState, v.getState());
     }
   }
 
@@ -694,7 +834,6 @@ public class TestVertexImpl {
     dispatcher.getEventHandler().handle(new VertexEvent(v.getVertexId(),
           VertexEventType.V_INIT));
     dispatcher.await();
-    Assert.assertEquals(VertexState.INITED, v.getState());
   }
 
   private void startVertex(VertexImpl v) {
@@ -724,11 +863,9 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexInit() {
-    VertexImpl v = vertices.get("vertex2");
-    initVertex(v);
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v3 = vertices.get("vertex3");
-    initVertex(v3);
 
     Assert.assertEquals("x3.y3", v3.getProcessorName());
     Assert.assertEquals("foo", v3.getJavaOpts());
@@ -775,7 +912,7 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexStart() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -783,8 +920,8 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexSetParallelism() {
+    initAllVertices(VertexState.INITED);
     VertexImpl v3 = vertices.get("vertex3");
-    initVertex(v3);
     Assert.assertEquals(2, v3.getTotalTasks());
     Map<TezTaskID, Task> tasks = v3.getTasks();
     Assert.assertEquals(2, tasks.size());
@@ -808,7 +945,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBasicVertexCompletion() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -833,7 +970,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   @Ignore // FIXME fix verteximpl for this test to work
   public void testDuplicateTaskCompletion() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -861,7 +998,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexFailure() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -881,30 +1018,16 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexKillDiagnostics() {
-    VertexImpl v1 = vertices.get("vertex1");
-    killVertex(v1);
-    String diagnostics =
-        StringUtils.join(",", v1.getDiagnostics()).toLowerCase();
-    Assert.assertTrue(diagnostics.contains(
-        "vertex received kill in new state"));
-
+    initAllVertices(VertexState.INITED);
     VertexImpl v2 = vertices.get("vertex2");
-    initVertex(v2);
     killVertex(v2);
-    diagnostics =
+    String diagnostics =
         StringUtils.join(",", v2.getDiagnostics()).toLowerCase();
     LOG.info("diagnostics v2: " + diagnostics);
     Assert.assertTrue(diagnostics.contains(
         "vertex received kill in inited state"));
 
     VertexImpl v3 = vertices.get("vertex3");
-    VertexImpl v4 = vertices.get("vertex4");
-    VertexImpl v5 = vertices.get("vertex5");
-    VertexImpl v6 = vertices.get("vertex6");
-    initVertex(v3);
-    initVertex(v4);
-    initVertex(v5);
-    initVertex(v6);
 
     startVertex(v3);
     killVertex(v3);
@@ -917,7 +1040,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexKillPending() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -943,7 +1066,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test
   public void testVertexKill() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -969,7 +1092,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testKilledTasksHandling() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -987,26 +1110,24 @@ public class TestVertexImpl {
 
   @Test(timeout = 5000)
   public void testVertexCommitterInit() {
+    initAllVertices(VertexState.INITED);
     VertexImpl v2 = vertices.get("vertex2");
-    initVertex(v2);
     Assert.assertTrue(v2.getVertexOutputCommitter()
         instanceof NullVertexOutputCommitter);
 
     VertexImpl v6 = vertices.get("vertex6");
-    initVertex(v6);
     Assert.assertTrue(v6.getVertexOutputCommitter()
         instanceof MRVertexOutputCommitter);
   }
 
   @Test(timeout = 5000)
   public void testVertexSchedulerInit() {
+    initAllVertices(VertexState.INITED);
     VertexImpl v2 = vertices.get("vertex2");
-    initVertex(v2);
     Assert.assertTrue(v2.getVertexScheduler()
         instanceof ImmediateStartVertexScheduler);
 
     VertexImpl v6 = vertices.get("vertex6");
-    initVertex(v6);
     Assert.assertTrue(v6.getVertexScheduler()
         instanceof ShuffleVertexManager);
   }
@@ -1014,7 +1135,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexTaskFailure() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1045,7 +1166,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testSourceVertexStartHandling() {
     LOG.info("Testing testSourceVertexStartHandling");
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v4 = vertices.get("vertex4");
     VertexImpl v5 = vertices.get("vertex5");
@@ -1077,7 +1198,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testSourceTaskAttemptCompletionEvents() {
     LOG.info("Testing testSourceTaskAttemptCompletionEvents");
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v4 = vertices.get("vertex4");
     VertexImpl v5 = vertices.get("vertex5");
@@ -1125,7 +1246,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testDAGEventGeneration() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
@@ -1148,7 +1269,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testTaskReschedule() {
     // For downstream failures
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1185,7 +1306,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testVertexSuccessToRunningAfterTaskScheduler() {
     // For downstream failures
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1228,7 +1349,7 @@ public class TestVertexImpl {
   @Test(timeout = 5000)
   public void testVertexSuccessToFailedAfterTaskScheduler() {
     // For downstream failures
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1262,7 +1383,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexCommit() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1303,7 +1424,7 @@ public class TestVertexImpl {
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testBadCommitter() {
-    initAllVertices();
+    initAllVertices(VertexState.INITED);
 
     VertexImpl v = vertices.get("vertex2");
 
@@ -1330,6 +1451,46 @@ public class TestVertexImpl {
     Assert.assertEquals(0, committer.initCounter); // already done in init
     Assert.assertEquals(0, committer.setupCounter); // already done in init
   }
+  
+  @Test//(timeout = 5000)
+  public void testVertexWithOneToOneSplit() {
+    // create a diamond shaped dag with 1-1 edges. 
+    // split the source and remaining vertices should split equally
+    // vertex with 2 incoming splits from the same source should split once
+    useCustomInitializer = true;
+    setupPreDagCreation();
+    dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer");
+    setupPostDagCreation();
+    initAllVertices(VertexState.INITIALIZING);
+    
+    int numTasks = 5;
+    VertexImplWithCustomInitializer v1 = (VertexImplWithCustomInitializer) vertices
+        .get("vertex1");
+    Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
+    RootInputInitializerRunnerControlled runner1 = v1.getRootInputInitializerRunner();
+    List<TaskLocationHint> v1Hints = createTaskLocationHints(numTasks);
+    runner1.completeInputInitialization(numTasks, v1Hints);
+
+    Assert.assertEquals(VertexState.INITED, v1.getState());
+    Assert.assertEquals(numTasks, v1.getTotalTasks());
+    Assert.assertEquals(RootInputVertexManager.class.getName(), v1
+        .getVertexScheduler().getClass().getName());
+    Assert.assertEquals(v1Hints, v1.getVertexLocationHint().getTaskLocationHints());
+    Assert.assertEquals(true, runner1.hasShutDown);
+    
+    Assert.assertEquals(numTasks, vertices.get("vertex2").getTotalTasks());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex2").getState());
+    Assert.assertEquals(numTasks, vertices.get("vertex3").getTotalTasks());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex3").getState());
+    Assert.assertEquals(numTasks, vertices.get("vertex4").getTotalTasks());
+    Assert.assertEquals(VertexState.INITED, vertices.get("vertex4").getState());
+    
+    startVertex(v1);
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex1").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex2").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex3").getState());
+    Assert.assertEquals(VertexState.RUNNING, vertices.get("vertex4").getState());
+  }
 
   @Test(timeout = 5000)
   public void testHistoryEventGeneration() {
@@ -1371,9 +1532,6 @@ public class TestVertexImpl {
     Assert.assertEquals(true, runner1.hasShutDown);
     
     VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
-    dispatcher.getEventHandler().handle(
-        new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
-    dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
     runner2.failInputInitialization();
@@ -1410,9 +1568,6 @@ public class TestVertexImpl {
     Assert.assertEquals(true, runner1.hasShutDown);
     
     VertexImplWithCustomInitializer v2 = (VertexImplWithCustomInitializer) vertices.get("vertex2");
-    dispatcher.getEventHandler().handle(
-        new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
-    dispatcher.await();
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
     RootInputInitializerRunnerControlled runner2 = v2.getRootInputInitializerRunner();
     List<TaskLocationHint> v2Hints = createTaskLocationHints(10);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index 23f5c72..ff2a804 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -20,7 +20,6 @@ package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
 import java.text.DecimalFormat;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Set;
 
@@ -84,7 +83,9 @@ public class ExampleDriver {
       pgd.addClass("orderedwordcount", OrderedWordCount.class,
           "Word Count with words sorted on frequency");
       pgd.addClass("filterLinesByWord", FilterLinesByWord.class,
-          "Filters lines by the specified word");
+          "Filters lines by the specified word using broadcast edge");
+      pgd.addClass("filterLinesByWordOneToOne", FilterLinesByWordOneToOne.class,
+          "Filters lines by the specified word using OneToOne edge");
       exitCode = pgd.run(argv);
     }
     catch(Throwable e){

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
new file mode 100644
index 0000000..9c05599
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
+import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.processor.FilterByWordInputProcessor;
+import org.apache.tez.processor.FilterByWordOutputProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
+
+public class FilterLinesByWordOneToOne {
+
+  private static Log LOG = LogFactory.getLog(FilterLinesByWordOneToOne.class);
+
+  public static final String FILTER_PARAM_NAME = "tez.runtime.examples.filterbyword.word";
+
+  private static void printUsage() {
+    System.err.println("Usage filterLinesByWordOneToOne <in> <out> <filter_word>" 
+        + " [-generateSplitsInClient true/<false>]");
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, TezException {
+    Configuration conf = new Configuration();
+    String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    boolean generateSplitsInClient = false;
+    SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
+    try {
+      generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
+      otherArgs = splitCmdLineParser.getRemainingArgs();
+    } catch (ParseException e1) {
+      System.err.println("Invalid options");
+      printUsage();
+      System.exit(2);
+    }
+
+    if (otherArgs.length != 3) {
+      printUsage();
+      System.exit(2);
+    }
+
+    String inputPath = otherArgs[0];
+    String outputPath = otherArgs[1];
+    String filterWord = otherArgs[2];
+
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(new Path(outputPath))) {
+      System.err.println("Output directory : " + outputPath + " already exists");
+      System.exit(2);
+    }
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+
+    fs.getWorkingDirectory();
+    Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, MRHelpers.getMRAMJavaOpts(tezConf));
+
+    String jarPath = ClassUtil.findContainingJar(FilterLinesByWordOneToOne.class);
+    if (jarPath == null) {
+      throw new TezUncheckedException("Could not find any jar containing"
+          + FilterLinesByWordOneToOne.class.getName() + " in the classpath");
+    }
+
+    Path remoteJarPath = fs.makeQualified(new Path(stagingDir, "dag_job.jar"));
+    fs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+
+    Map<String, LocalResource> commonLocalResources = new TreeMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+        remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+
+
+    AMConfiguration amConf = new AMConfiguration(null, commonLocalResources, tezConf, null);
+    TezSessionConfiguration sessionConf = new TezSessionConfiguration(amConf, tezConf);
+    TezSession tezSession = new TezSession("FilterLinesByWordSession", sessionConf);
+    tezSession.start(); // Why do I need to start the TezSession.
+
+    Configuration stage1Conf = new JobConf(conf);
+    stage1Conf.set(FileInputFormat.INPUT_DIR, inputPath);
+    stage1Conf.setBoolean("mapred.mapper.new-api", false);
+    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, Text.class.getName());
+    stage1Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, TextLongPair.class.getName());
+    stage1Conf.set(FILTER_PARAM_NAME, filterWord);
+
+    InputSplitInfo inputSplitInfo = null;
+    if (generateSplitsInClient) {
+      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf, stagingDir);
+    }
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+
+
+
+    Configuration stage2Conf = new JobConf(conf);
+    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, Text.class.getName());
+    stage2Conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, TextLongPair.class.getName());
+    stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
+    stage2Conf.setBoolean("mapred.mapper.new-api", false);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf, stage1Conf);
+
+    MRHelpers.doJobClientMagic(stage1Conf);
+    MRHelpers.doJobClientMagic(stage2Conf);
+
+    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    // Setup stage1 Vertex
+    int stage1NumTasks = generateSplitsInClient ? inputSplitInfo.getNumTasks() : -1;
+    Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
+        FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload),
+        stage1NumTasks, MRHelpers.getMapResource(stage1Conf));
+    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+    if (generateSplitsInClient) {
+      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+      stage1LocalResources.putAll(commonLocalResources);
+      MRHelpers.updateLocalResourcesForInputSplits(fs, inputSplitInfo, stage1LocalResources);
+      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+    } else {
+      stage1Vertex.setTaskLocalResources(commonLocalResources);
+    }
+    Map<String, String> stage1Env = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true);
+    stage1Vertex.setTaskEnvironment(stage1Env);
+
+    // Configure the Input for stage1
+    Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
+        : MRInputAMSplitGenerator.class;
+    stage1Vertex.addInput("MRInput",
+        new InputDescriptor(MRInputLegacy.class.getName())
+            .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
+        initializerClazz);
+
+    // Setup stage2 Vertex
+    Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
+        FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+        .createUserPayloadFromConf(stage2Conf)), stage1NumTasks,
+        MRHelpers.getMapResource(stage2Conf));
+    stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf)).setTaskLocalResources(commonLocalResources);
+    Map<String, String> stage2Env = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false);
+    stage2Vertex.setTaskEnvironment(stage2Env);
+
+    // Configure the Output for stage2
+    stage2Vertex.addOutput("MROutput",
+        new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
+            .createUserPayloadFromConf(stage2Conf)));
+
+    DAG dag = new DAG("FilterLinesByWord");
+    Edge edge = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+        DataMovementType.ONE_TO_ONE, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+            OnFileUnorderedKVOutput.class.getName()), new InputDescriptor(
+            ShuffledUnorderedKVInput.class.getName())));
+    dag.addVertex(stage1Vertex).addVertex(stage2Vertex).addEdge(edge);
+
+    LOG.info("Submitting DAG to Tez Session");
+    DAGClient dagClient = tezSession.submitDAG(dag);
+    LOG.info("Submitted DAG to Tez Session");
+
+    DAGStatus dagStatus = null;
+    String[] vNames = { "stage1", "stage2" };
+    try {
+      while (true) {
+        dagStatus = dagClient.getDAGStatus(null);
+        if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+            dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+            dagStatus.getState() == DAGStatus.State.FAILED ||
+            dagStatus.getState() == DAGStatus.State.KILLED ||
+            dagStatus.getState() == DAGStatus.State.ERROR) {
+          break;
+        }
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          // continue;
+        }
+      }
+
+      while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+        try {
+          ExampleDriver.printDAGStatus(dagClient, vNames);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+          dagStatus = dagClient.getDAGStatus(null);
+        } catch (TezException e) {
+          LOG.fatal("Failed to get application progress. Exiting");
+          System.exit(-1);
+        }
+      }
+    } finally {
+      fs.delete(stagingDir, true);
+      tezSession.stop();
+    }
+
+    ExampleDriver.printDAGStatus(dagClient, vNames);
+    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+    System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 65df726..3ffd076 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -20,7 +20,6 @@ package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8aac5ba4/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index 2a2395b..423cd1e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -31,6 +31,7 @@ import org.apache.tez.mapreduce.examples.FilterLinesByWord;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -94,7 +95,8 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor {
     
     
     
-    MRInput mrInput = (MRInput) li;
+    MRInputLegacy mrInput = (MRInputLegacy) li;
+    mrInput.init();
     OnFileUnorderedKVOutput kvOutput = (OnFileUnorderedKVOutput) lo;
 
     Configuration updatedConf = mrInput.getConfigUpdates();