You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/09/27 22:00:12 UTC

git commit: TEZ-481. Fix Reduce auto-parallelism after TEZ-398 (bikas)

Updated Branches:
  refs/heads/master 034ca0a95 -> 04d5e3771


TEZ-481. Fix Reduce auto-parallelism after TEZ-398 (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/04d5e377
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/04d5e377
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/04d5e377

Branch: refs/heads/master
Commit: 04d5e3771587009e9a6f0e22cd1582e9eda78ef4
Parents: 034ca0a
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Sep 27 12:58:14 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Sep 27 12:58:14 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     |  6 --
 .../api/events/InputInformationEvent.java       | 41 ----------
 .../runtime/api/events/VertexManagerEvent.java  | 61 +++++++++++++++
 tez-api/src/main/proto/Events.proto             |  7 +-
 .../apache/tez/dag/app/dag/VertexScheduler.java |  2 +
 .../app/dag/event/DAGEventVertexCompleted.java  |  4 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 11 ++-
 .../dag/impl/ImmediateStartVertexScheduler.java |  5 ++
 .../dag/app/dag/impl/ShuffleVertexManager.java  | 77 ++++++++++---------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 29 ++++++-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       | 11 ++-
 .../dag/app/dag/impl/TestVertexScheduler.java   | 27 ++++---
 .../apache/tez/runtime/api/impl/EventType.java  |  4 +-
 .../apache/tez/runtime/api/impl/TezEvent.java   | 33 ++++----
 .../library/common/shuffle/impl/Fetcher.java    | 20 +++--
 .../library/common/shuffle/impl/Shuffle.java    | 31 +-------
 .../shuffle/impl/ShuffleInputEventHandler.java  | 26 +------
 .../common/shuffle/impl/ShuffleScheduler.java   | 80 +++++++-------------
 .../library/output/LocalOnFileSorterOutput.java |  2 +-
 .../library/output/OnFileSortedOutput.java      | 18 ++++-
 .../src/main/proto/ShufflePayloads.proto        |  4 +
 21 files changed, 255 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index ace87ca..07602ea 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -171,12 +171,6 @@ public class TezJobConfig {
       "tez.runtime.shuffle.use.in-memory";
   public static final boolean DEFAULT_TEZ_RUNTIME_SHUFFLE_USE_IN_MEMORY = false;
 
-  // TODO NEWTEZ Remove these config parameters. Will be part of an event.
-  @Private
-  public static final String TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE = 
-      "tez.runtime.shuffle.partition-range";
-  public static int TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT = 1;
-
   /**
    * 
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
deleted file mode 100644
index 0322b75..0000000
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInformationEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.api.events;
-
-import org.apache.tez.runtime.api.Event;
-
-/**
- * Event used to send user specific data from the user 
- * code in the AM to the task input
- */
-public class InputInformationEvent extends Event {
-
-  /**
-   * User Payload for this Event
-   */
-  private final byte[] userPayload;
-  public InputInformationEvent(byte[] userPayload) {
-    this.userPayload = userPayload;
-  }
-
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
new file mode 100644
index 0000000..84431bc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/VertexManagerEvent.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.events;
+
+import org.apache.tez.runtime.api.Event;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Event used to send information from a Task to the VertexManager for a vertex.
+ * This may be used to send statistics like samples etc to the VertexManager for
+ * automatic plan recofigurations based on observed statistics
+ */
+public class VertexManagerEvent extends Event {
+
+  /**
+   * Vertex to which the event should be sent 
+   */
+  private final String targetVertexName;
+  
+  /**
+   * User payload to be sent
+   */
+  private final byte[] userPayload;
+  
+  /**
+   * Create a new VertexManagerEvent
+   * @param vertexName
+   * @param userPayload This should not be modified since a reference is kept
+   */
+  public VertexManagerEvent(String vertexName, byte[] userPayload) {
+    Preconditions.checkArgument(vertexName != null);
+    Preconditions.checkArgument(userPayload != null);
+    this.targetVertexName = vertexName;
+    this.userPayload = userPayload;
+  }
+  
+  public String getTargetVertexName() {
+    return targetVertexName;
+  }
+  
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
index b91125d..d76708a 100644
--- a/tez-api/src/main/proto/Events.proto
+++ b/tez-api/src/main/proto/Events.proto
@@ -39,6 +39,7 @@ message InputFailedEventProto {
   optional int32 version = 4;
 }
 
-message InputInformationEventProto {
-  optional bytes user_payload = 1;
-}
+message VertexManagerEventProto {
+  optional string target_vertex_name = 1;
+  optional bytes user_payload = 2;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
index 3789702..4656efd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/VertexScheduler.java
@@ -20,10 +20,12 @@ package org.apache.tez.dag.app.dag;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 // Rename to VertexManager TEZ-364 and move to DAG API. Make abstract class.
 public interface VertexScheduler {
   void initialize(Configuration conf);
   void onVertexStarted();
   void onSourceTaskCompleted(TezTaskAttemptID attemptId);
+  void onVertexManagerEventReceived(VertexManagerEvent vmEvent);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
index 80c7f42..3d4367d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventVertexCompleted.java
@@ -23,8 +23,8 @@ import org.apache.tez.dag.records.TezVertexID;
 
 public class DAGEventVertexCompleted extends DAGEvent {
 
-  private TezVertexID vertexId;
-  private VertexState vertexState;
+  private final TezVertexID vertexId;
+  private final VertexState vertexState;
 
   public DAGEventVertexCompleted(TezVertexID vertexId, VertexState vertexState) {
     super(vertexId.getDAGId(), DAGEventType.DAG_VERTEX_COMPLETED);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index b4c9981..c81b835 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -24,8 +24,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -136,6 +138,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private TezCounters dagCounters = new TezCounters();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
+  private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
 
   public final Configuration conf;
   private final DAGPlan jobPlan;
@@ -1132,8 +1135,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
       job.numCompletedVertices++;
       if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
+        if (!job.reRunningVertices.contains(vertex.getVertexId())) {
+          // vertex succeeded for the first time 
+          job.dagScheduler.vertexCompleted(vertex);
+        }
         job.vertexSucceeded(vertex);
-        job.dagScheduler.vertexCompleted(vertex);
       }
       else if (vertexEvent.getVertexState() == VertexState.FAILED) {
         job.enactKill(DAGTerminationCause.VERTEX_FAILURE, VertexTerminationCause.OTHER_VERTEX_FAILURE);
@@ -1144,6 +1150,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         job.vertexKilled(vertex);
         forceTransitionToKillWait = true;
       }
+      
+      job.reRunningVertices.remove(vertex.getVertexId());
 
       LOG.info("Vertex " + vertex.getVertexId() + " completed."
           + ", numCompletedVertices=" + job.numCompletedVertices
@@ -1190,6 +1198,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
 
   private void vertexReRunning(Vertex vertex) {
+    reRunningVertices.add(vertex.getVertexId());
     numSuccessfulVertices--;
     addDiagnostic("Vertex re-running " + vertex.getVertexId());
     // TODO: Metrics

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
index b79a426..2d94006 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexScheduler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 /**
  * Starts all tasks immediately on vertex start
@@ -46,4 +47,8 @@ public class ImmediateStartVertexScheduler implements VertexScheduler {
   public void initialize(Configuration conf) {    
   }
 
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index e039c72..d633d0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,7 +28,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -39,10 +37,13 @@ import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Starts scheduling tasks when number of completed source tasks crosses 
@@ -64,6 +65,7 @@ public class ShuffleVertexManager implements VertexScheduler {
   
   int numSourceTasks = 0;
   int numSourceTasksCompleted = 0;
+  int numVertexManagerEventsReceived = 0;
   ArrayList<TezTaskID> pendingTasks;
   int totalTasksToSchedule = 0;
   HashMap<TezVertexID, Vertex> bipartiteSources = 
@@ -196,21 +198,34 @@ public class ShuffleVertexManager implements VertexScheduler {
       if (completedSourceTasks.add(srcTaskId)) {
         // source task has completed
         ++numSourceTasksCompleted;
-        if (enableAutoParallelism) {
-          // save output size
-          // TODO TEZ-481
-          long sourceTaskOutputSize = 100000000l;//sourceTaskAttempt.getDataSize();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Source task: " + srcAttemptId
-                + " finished with output size: " + sourceTaskOutputSize);
-          }
-          completedSourceTasksOutputSize += sourceTaskOutputSize;
-        }
       }
       schedulePendingTasks();
     }
   }
   
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+    // TODO handle duplicates from retries
+    if (enableAutoParallelism) {
+      // save output size
+      VertexManagerEventPayloadProto proto;
+      try {
+        proto = VertexManagerEventPayloadProto.parseFrom(vmEvent.getUserPayload());
+      } catch (InvalidProtocolBufferException e) {
+        throw new TezUncheckedException(e);
+      }
+      long sourceTaskOutputSize = proto.getOutputSize();
+      numVertexManagerEventsReceived++;
+      completedSourceTasksOutputSize += sourceTaskOutputSize;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Received info of output size: " + sourceTaskOutputSize 
+            + " numInfoReceived: " + numVertexManagerEventsReceived
+            + " total output size: " + completedSourceTasksOutputSize);
+      }
+    }
+    
+  }
+  
   void updatePendingTasks() {
     pendingTasks.clear();
     pendingTasks.addAll(managedVertex.getTasks().keySet());
@@ -230,9 +245,14 @@ public class ShuffleVertexManager implements VertexScheduler {
     if(numSourceTasksCompleted == 0) {
       return;
     }
+    
+    if(numVertexManagerEventsReceived == 0) {
+      return;
+    }
+    
     int currentParallelism = pendingTasks.size();
     long expectedTotalSourceTasksOutputSize = 
-        (numSourceTasks*completedSourceTasksOutputSize)/numSourceTasksCompleted;
+        (numSourceTasks*completedSourceTasksOutputSize)/numVertexManagerEventsReceived;
     int desiredTaskParallelism = 
         (int)(expectedTotalSourceTasksOutputSize/desiredTaskInputDataSize);
     if(desiredTaskParallelism < minTaskParallelism) {
@@ -256,42 +276,25 @@ public class ShuffleVertexManager implements VertexScheduler {
     
     int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ?
           (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange);
-    
+
     if(finalTaskParallelism < currentParallelism) {
       // final parallelism is less than actual parallelism
       LOG.info("Reducing parallelism for vertex: " + managedVertex.getVertexId() 
           + " to " + finalTaskParallelism + " from " + pendingTasks.size() 
           + " . Expected output: " + expectedTotalSourceTasksOutputSize 
           + " based on actual output: " + completedSourceTasksOutputSize
-          + " from " + numSourceTasksCompleted + " completed source tasks. "
+          + " from " + numVertexManagerEventsReceived + " vertex manager events. "
           + " desiredTaskInputSize: " + desiredTaskInputDataSize);
-
-      List<byte[]> taskConfs = new ArrayList<byte[]>(finalTaskParallelism);
-      try {
-        Configuration taskConf = new Configuration(false);
-        taskConf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE,
-            basePartitionRange);
-        // create event user payload to inform the task
-        for (int i = 0; i < numShufflersWithBaseRange; ++i) {
-          taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
-        }
-        if(finalTaskParallelism > numShufflersWithBaseRange) {
-          taskConf.setInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE,
-              remainderRangeForLastShuffler);
-          taskConfs.add(MRHelpers.createUserPayloadFromConf(taskConf));
-        }
-      } catch (IOException e) {
-        throw new TezUncheckedException(e);
-      }
       
       Map<Vertex, EdgeManager> edgeManagers = new HashMap<Vertex, EdgeManager>(
           bipartiteSources.size());
       for(Vertex vertex : bipartiteSources.values()) {
         // use currentParallelism for numSourceTasks to maintain original state
         // for the source tasks
-        edgeManagers.put(vertex, new CustomShuffleEdgeManager(currentParallelism,
-            finalTaskParallelism, basePartitionRange,
-            remainderRangeForLastShuffler));
+        edgeManagers.put(vertex, new CustomShuffleEdgeManager(
+            currentParallelism, finalTaskParallelism, basePartitionRange,
+            ((remainderRangeForLastShuffler > 0) ?
+                remainderRangeForLastShuffler : basePartitionRange)));
       }
       
       managedVertex.setParallelism(finalTaskParallelism, edgeManagers);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index bf10322..96f0785 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -114,6 +114,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -211,6 +212,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.INITED, VertexState.RUNNING,
               VertexEventType.V_START,
               new StartTransition())
+          .addTransition(VertexState.INITED,
+              VertexState.INITED, VertexEventType.V_ROUTE_EVENT,
+              ROUTE_EVENT_TRANSITION)
+
 
           .addTransition(VertexState.INITED, VertexState.KILLED,
               VertexEventType.V_TERMINATE,
@@ -260,6 +265,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           // Ignore-able events
           .addTransition(VertexState.TERMINATING, VertexState.TERMINATING,
               EnumSet.of(VertexEventType.V_TERMINATE,
+                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_RESCHEDULED))
@@ -278,6 +284,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.SUCCEEDED, VertexState.SUCCEEDED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
+                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_COMPLETED))
 
           // Transitions from FAILED state
@@ -289,6 +296,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           .addTransition(VertexState.FAILED, VertexState.FAILED,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_RESCHEDULED,
+                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
 
@@ -302,6 +310,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexEventType.V_TERMINATE,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_START,
+                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TASK_RESCHEDULED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
                   VertexEventType.V_TASK_COMPLETED))
@@ -313,6 +322,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               EnumSet.of(VertexEventType.V_INIT,
                   VertexEventType.V_SOURCE_VERTEX_STARTED,
                   VertexEventType.V_START,
+                  VertexEventType.V_ROUTE_EVENT,
                   VertexEventType.V_TERMINATE,
                   VertexEventType.V_TASK_COMPLETED,
                   VertexEventType.V_TASK_ATTEMPT_COMPLETED,
@@ -1391,10 +1401,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         LOG.info("Vertex: " + vertex.getName() + " routing event: "
             + tezEvent.getEventType());
         EventMetaData sourceMeta = tezEvent.getSourceInfo();
-        checkEventSourceMetadata(vertex, sourceMeta);
         switch(tezEvent.getEventType()) {
         case DATA_MOVEMENT_EVENT:
           {
+            checkEventSourceMetadata(vertex, sourceMeta);
             TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
             DataMovementEvent dmEvent = (DataMovementEvent) tezEvent.getEvent();
             dmEvent.setVersion(srcTaId.getId());
@@ -1403,8 +1413,21 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             destEdge.sendTezEventToDestinationTasks(tezEvent);
           }
           break;
+        case VERTEX_MANAGER_EVENT:
+        {
+          VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
+          Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
+          if (target == vertex) {
+            vertex.vertexScheduler.onVertexManagerEventReceived(vmEvent);
+          } else {
+            vertex.eventHandler.handle(new VertexEventRouteEvent(target
+                .getVertexId(), Collections.singletonList(tezEvent)));
+          }
+        }
+          break;
         case INPUT_FAILED_EVENT:
         {
+          checkEventSourceMetadata(vertex, sourceMeta);
           TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
           InputFailedEvent ifEvent = (InputFailedEvent) tezEvent.getEvent();
           ifEvent.setVersion(srcTaId.getId());
@@ -1415,6 +1438,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         break;
         case INPUT_READ_ERROR_EVENT:
           {
+            checkEventSourceMetadata(vertex, sourceMeta);
             Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
                 sourceMeta.getEdgeVertexName()));
             srcEdge.sendTezEventToSourceTasks(tezEvent);
@@ -1422,6 +1446,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case TASK_STATUS_UPDATE_EVENT:
           {
+            checkEventSourceMetadata(vertex, sourceMeta);
             TaskStatusUpdateEvent sEvent =
                 (TaskStatusUpdateEvent) tezEvent.getEvent();
             vertex.getEventHandler().handle(
@@ -1431,6 +1456,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case TASK_ATTEMPT_COMPLETED_EVENT:
           {
+            checkEventSourceMetadata(vertex, sourceMeta);
             vertex.getEventHandler().handle(
                 new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
                     TaskAttemptEventType.TA_DONE));
@@ -1438,6 +1464,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           break;
         case TASK_ATTEMPT_FAILED_EVENT:
           {
+            checkEventSourceMetadata(vertex, sourceMeta);
             TaskAttemptFailedEvent taskFailedEvent =
                 (TaskAttemptFailedEvent) tezEvent.getEvent();
             vertex.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1a36c26..55b846d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -18,8 +18,7 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,6 +45,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -526,7 +526,7 @@ public class TestDAGImpl {
     dispatcher.init(conf);
     dispatcher.start();
   }
-
+  
   @After
   public void teardown() {
     dispatcher.await();
@@ -606,6 +606,7 @@ public class TestDAGImpl {
   @Test
   public void testVertexReRunning() {
     initDAG(dag);
+    dag.dagScheduler = mock(DAGScheduler.class);
     startDAG(dag);
     dispatcher.await();
 
@@ -620,6 +621,7 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
     Assert.assertEquals(1, dag.numCompletedVertices);
+    verify(dag.dagScheduler, times(1)).vertexCompleted(v);
     
     dispatcher.getEventHandler().handle(
         new VertexEventTaskReschedule(new TezTaskID(vId, 0)));
@@ -635,6 +637,9 @@ public class TestDAGImpl {
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
     Assert.assertEquals(1, dag.numCompletedVertices);
+    
+    // re-completion is not notified again
+    verify(dag.dagScheduler, times(1)).vertexCompleted(v);
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
index b2e13e2..528d03b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexScheduler.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -40,9 +39,9 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -53,7 +52,6 @@ public class TestVertexScheduler {
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test(timeout = 5000)
-  @Ignore // TODO TEZ-481
   public void testShuffleVertexManagerAutoParallelism() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(
@@ -98,7 +96,6 @@ public class TestVertexScheduler {
     when(mockManagedVertex.getVertexId()).thenReturn(mockManagedVertexId);
     when(mockManagedVertex.getInputVertices()).thenReturn(mockInputVertices);
     
-    
     mockInputVertices.put(mockSrcVertex1, new Edge(eProp1, mockEventHandler));
     mockInputVertices.put(mockSrcVertex2, new Edge(eProp2, mockEventHandler));
     mockInputVertices.put(mockSrcVertex3, new Edge(eProp3, mockEventHandler));
@@ -162,27 +159,29 @@ public class TestVertexScheduler {
     TezTaskAttemptID mockSrcAttemptId31 = 
         new TezTaskAttemptID(new TezTaskID(mockSrcVertexId3, 0), 0);
 
+    byte[] payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
+    VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
     // parallelism not change due to large data size
-    //when(mockEvent.getDataSize()).thenReturn(5000L);
     scheduler = createScheduler(conf, mockManagedVertex, 0.1f, 0.1f);
     scheduler.onVertexStarted();
     Assert.assertTrue(scheduler.pendingTasks.size() == 4); // no tasks scheduled
     Assert.assertTrue(scheduler.numSourceTasks == 4);
+    scheduler.onVertexManagerEventReceived(vmEvent);
     scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
     // managedVertex tasks reduced
     verify(mockManagedVertex, times(0)).setParallelism(anyInt(), anyMap());
     Assert.assertEquals(0, scheduler.pendingTasks.size()); // all tasks scheduled
     Assert.assertEquals(4, scheduledTasks.size());
-    Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
+    Assert.assertEquals(1, scheduler.numSourceTasksCompleted); // TODO
     Assert.assertEquals(5000L, scheduler.completedSourceTasksOutputSize);
     
+    
     // parallelism changed due to small data size
-    //when(mockEvent.getDataSize()).thenReturn(500L);
     scheduledTasks.clear();
-    Configuration procConf = new Configuration();
-    ProcessorDescriptor procDesc = new ProcessorDescriptor("REDUCE");
-    procDesc.setUserPayload(MRHelpers.createUserPayloadFromConf(procConf));
-    when(mockManagedVertex.getProcessorDescriptor()).thenReturn(procDesc);
+    payload =
+        VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
+    vmEvent = new VertexManagerEvent("Vertex", payload);
     
     scheduler = createScheduler(conf, mockManagedVertex, 0.5f, 0.5f);
     scheduler.onVertexStarted();
@@ -193,10 +192,12 @@ public class TestVertexScheduler {
     Assert.assertEquals(4, scheduler.pendingTasks.size()); // no tasks scheduled
     Assert.assertEquals(4, scheduler.numSourceTasks);
     Assert.assertEquals(0, scheduler.numSourceTasksCompleted);
+    scheduler.onVertexManagerEventReceived(vmEvent);
     scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
     Assert.assertEquals(4, scheduler.pendingTasks.size());
     Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
     Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
+    Assert.assertEquals(1, scheduler.numVertexManagerEventsReceived);
     Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
     // ignore duplicate completion
     scheduler.onSourceTaskCompleted(mockSrcAttemptId11);
@@ -205,6 +206,7 @@ public class TestVertexScheduler {
     Assert.assertEquals(1, scheduler.numSourceTasksCompleted);
     Assert.assertEquals(500L, scheduler.completedSourceTasksOutputSize);
     
+    scheduler.onVertexManagerEventReceived(vmEvent);
     scheduler.onSourceTaskCompleted(mockSrcAttemptId12);
     // managedVertex tasks reduced
     verify(mockManagedVertex).setParallelism(eq(2), anyMap());
@@ -215,6 +217,7 @@ public class TestVertexScheduler {
     Assert.assertTrue(scheduledTasks.contains(mockTaskId1));
     Assert.assertTrue(scheduledTasks.contains(mockTaskId2));
     Assert.assertEquals(2, scheduler.numSourceTasksCompleted);
+    Assert.assertEquals(2, scheduler.numVertexManagerEventsReceived);
     Assert.assertEquals(1000L, scheduler.completedSourceTasksOutputSize);
     
     // more completions dont cause recalculation of parallelism

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
index 81ff5fc..b42096c 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/EventType.java
@@ -24,6 +24,6 @@ public enum EventType {
   DATA_MOVEMENT_EVENT,
   INPUT_READ_ERROR_EVENT,
   INPUT_FAILED_EVENT,
-  INTPUT_INFORMATION_EVENT,
-  TASK_STATUS_UPDATE_EVENT
+  TASK_STATUS_UPDATE_EVENT,
+  VERTEX_MANAGER_EVENT
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index e195cf9..03e41f4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -28,14 +28,14 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos.DataMovementEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputFailedEventProto;
-import org.apache.tez.runtime.api.events.EventProtos.InputInformationEventProto;
 import org.apache.tez.runtime.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputInformationEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
 
@@ -59,14 +59,14 @@ public class TezEvent implements Writable {
     this.setSourceInfo(sourceInfo);
     if (event instanceof DataMovementEvent) {
       eventType = EventType.DATA_MOVEMENT_EVENT;
+    } else if (event instanceof VertexManagerEvent) {
+      eventType = EventType.VERTEX_MANAGER_EVENT;
     } else if (event instanceof InputReadErrorEvent) {
       eventType = EventType.INPUT_READ_ERROR_EVENT;
     } else if (event instanceof TaskAttemptFailedEvent) {
       eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
     } else if (event instanceof TaskAttemptCompletedEvent) {
       eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
-    } else if (event instanceof InputInformationEvent) {
-      eventType = EventType.INTPUT_INFORMATION_EVENT;
     } else if (event instanceof InputFailedEvent) {
       eventType = EventType.INPUT_FAILED_EVENT;
     } else if (event instanceof TaskStatusUpdateEvent) {
@@ -123,6 +123,13 @@ public class TezEvent implements Writable {
           .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
           .build().toByteArray();
         break;
+      case VERTEX_MANAGER_EVENT:
+        VertexManagerEvent vmEvt = (VertexManagerEvent) event;
+        eventBytes = VertexManagerEventProto.newBuilder()
+          .setTargetVertexName(vmEvt.getTargetVertexName())
+          .setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()))
+          .build().toByteArray();
+        break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
         eventBytes = InputReadErrorEventProto.newBuilder()
@@ -146,11 +153,7 @@ public class TezEvent implements Writable {
             .setSourceIndex(ifEvt.getSourceIndex())
             .setTargetIndex(ifEvt.getTargetIndex())
             .setVersion(ifEvt.getVersion()).build().toByteArray();
-      case INTPUT_INFORMATION_EVENT:
-        InputInformationEvent iEvt = (InputInformationEvent) event;
-        eventBytes = InputInformationEventProto.newBuilder()
-            .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
-            .build().toByteArray();
+        break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);
@@ -182,6 +185,12 @@ public class TezEvent implements Writable {
             dmProto.getTargetIndex(),
             dmProto.getUserPayload().toByteArray());
         break;
+      case VERTEX_MANAGER_EVENT:
+        VertexManagerEventProto vmProto =
+            VertexManagerEventProto.parseFrom(eventBytes);
+        event = new VertexManagerEvent(vmProto.getTargetVertexName(),
+            vmProto.getUserPayload().toByteArray());
+        break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =
             InputReadErrorEventProto.parseFrom(eventBytes);
@@ -202,12 +211,6 @@ public class TezEvent implements Writable {
         event = new InputFailedEvent(ifProto.getSourceIndex(),
             ifProto.getTargetIndex(), ifProto.getVersion());
         break;
-      case INTPUT_INFORMATION_EVENT:
-        InputInformationEventProto infoProto =
-            InputInformationEventProto.parseFrom(eventBytes);
-        event = new InputInformationEvent(
-            infoProto.getUserPayload().toByteArray());
-        break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
            + ", type=" + eventType);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index f5d1802..7741122 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -359,8 +359,8 @@ class Fetcher extends Thread {
       try {
         ShuffleHeader header = new ShuffleHeader();
         header.readFields(input);
-        String pathComponent = header.mapId;
-        srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
+        srcAttemptId = 
+            scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
         compressedLength = header.compressedLength;
         decompressedLength = header.uncompressedLength;
         forReduce = header.forReduce;
@@ -456,15 +456,13 @@ class Fetcher extends Thread {
       return false;
     }
     
-    int reduceStartId = shuffle.getReduceStartId();
-    int reduceRange = shuffle.getReduceRange();
-    if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
-      wrongReduceErrs.increment(1);
-      LOG.warn(getName() + " data for the wrong reduce map: " +
-               srcAttemptId + " len: " + compressedLength + " decomp len: " +
-               decompressedLength + " for reduce " + forReduce);
-      return false;
-    }
+//    if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
+//      wrongReduceErrs.increment(1);
+//      LOG.warn(getName() + " data for the wrong reduce map: " +
+//               srcAttemptId + " len: " + compressedLength + " decomp len: " +
+//               decompressedLength + " for reduce " + forReduce);
+//      return false;
+//    }
 
     // Sanity check
     if (!remaining.contains(srcAttemptId)) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
index 8689d11..acf987a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Shuffle.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.crypto.SecretKey;
 
@@ -65,10 +64,7 @@ public class Shuffle implements ExceptionReporter {
   private Throwable throwable = null;
   private String throwingThreadName = null;
   private final int numInputs;
-  private final AtomicInteger reduceStartId;
   private final SecretKey jobTokenSecret;
-  private AtomicInteger reduceRange = new AtomicInteger(
-      TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT);
 
   private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
 
@@ -105,9 +101,7 @@ public class Shuffle implements ExceptionReporter {
     TezCounter mergedMapOutputsCounter =
         inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     
-    reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
-    LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
-        + " with default reduce range: " + reduceRange.get());
+    LOG.info("Shuffle assigned with " + numInputs + " inputs");
 
     scheduler = new ShuffleScheduler(
           this.inputContext,
@@ -119,7 +113,6 @@ public class Shuffle implements ExceptionReporter {
           failedShuffleCounter);
     eventHandler= new ShuffleInputEventHandler(
           inputContext,
-          this,
           scheduler);
     merger = new MergeManager(
           this.conf,
@@ -233,14 +226,6 @@ public class Shuffle implements ExceptionReporter {
     }
   }
   
-  public int getReduceStartId() {
-    return reduceStartId.get();
-  }
-  
-  public int getReduceRange() {
-    return reduceRange.get();
-  }
-  
   public synchronized void reportException(Throwable t) {
     if (throwable == null) {
       throwable = t;
@@ -261,18 +246,4 @@ public class Shuffle implements ExceptionReporter {
     }
   }
 
-  public void setPartitionRange(int range) {
-    if (range == reduceRange.get()) {
-      return;
-    }
-    if (reduceRange.compareAndSet(
-        TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARTITION_RANGE_DEFAULT, range)) {
-      LOG.info("Reduce range set to: " + range);
-    } else {
-      TezUncheckedException e = 
-          new TezUncheckedException("Reduce range can be set only once.");
-      reportException(e);
-      throw e; 
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index 8b323b5..d731a46 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -28,12 +28,9 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.api.events.InputInformationEvent;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.InputInformationEventPayloadProto;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 public class ShuffleInputEventHandler {
@@ -42,15 +39,12 @@ public class ShuffleInputEventHandler {
 
   private final ShuffleScheduler scheduler;
   private final TezInputContext inputContext;
-  private final Shuffle shuffle;
 
   private int maxMapRuntime = 0;
-  private boolean shuffleRangeSet = false;
   
   public ShuffleInputEventHandler(TezInputContext inputContext,
-      Shuffle shuffle, ShuffleScheduler scheduler) {
+      ShuffleScheduler scheduler) {
     this.inputContext = inputContext;
-    this.shuffle = shuffle;
     this.scheduler = scheduler;
   }
 
@@ -62,28 +56,13 @@ public class ShuffleInputEventHandler {
   
   
   private void handleEvent(Event event) {
-    if (event instanceof InputInformationEvent) {
-      processInputInformationEvent((InputInformationEvent) event);
-    }
-    else if (event instanceof DataMovementEvent) {
+    if (event instanceof DataMovementEvent) {
       processDataMovementEvent((DataMovementEvent) event);      
     } else if (event instanceof InputFailedEvent) {
       processTaskFailedEvent((InputFailedEvent) event);
     }
   }
 
-  private void processInputInformationEvent(InputInformationEvent iiEvent) {
-    InputInformationEventPayloadProto inputInfoPayload;
-    try {
-      inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
-    }
-    int partitionRange = inputInfoPayload.getPartitionRange();
-    shuffle.setPartitionRange(partitionRange);
-    this.shuffleRangeSet = true;
-  }
-
   private void processDataMovementEvent(DataMovementEvent dmEvent) {
     // FIXME TODO NEWTEZ
     // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
@@ -95,6 +74,7 @@ public class ShuffleInputEventHandler {
     } 
     int partitionId = dmEvent.getSourceIndex();
     URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+    LOG.info("Data movement event baseUri:" + baseUri);
 
     InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
     scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index dcf8b6d..33da660 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -33,7 +33,6 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -61,7 +60,7 @@ class ShuffleScheduler {
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   
   // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
-  private final Map<Integer, MutableInt> finishedMaps;
+  private boolean[] finishedMaps;
   private final int numInputs;
   private int remainingMaps;
   private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
@@ -98,17 +97,16 @@ class ShuffleScheduler {
   
   public ShuffleScheduler(TezInputContext inputContext,
                           Configuration conf,
-                          int tasksInDegree,
+                          int numberOfInputs,
                           Shuffle shuffle,
                           TezCounter shuffledMapsCounter,
                           TezCounter reduceShuffleBytes,
                           TezCounter failedShuffleCounter) {
     this.inputContext = inputContext;
-    this.numInputs = tasksInDegree;
-    abortFailureLimit = Math.max(30, tasksInDegree / 10);
-    remainingMaps = tasksInDegree;
-  //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
-    finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
+    this.numInputs = numberOfInputs;
+    abortFailureLimit = Math.max(30, numberOfInputs / 10);
+    remainingMaps = numberOfInputs;
+    finishedMaps = new boolean[remainingMaps]; // default init to false
     this.shuffle = shuffle;
     this.shuffledMapsCounter = shuffledMapsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
@@ -116,7 +114,7 @@ class ShuffleScheduler {
     this.startTime = System.currentTimeMillis();
     this.lastProgressTime = startTime;
     referee.start();
-    this.maxFailedUniqueFetches = Math.min(tasksInDegree,
+    this.maxFailedUniqueFetches = Math.min(numberOfInputs,
         this.maxFailedUniqueFetches);
     this.maxFetchFailuresBeforeReporting = 
         conf.getInt(
@@ -138,13 +136,12 @@ class ShuffleScheduler {
     failureCounts.remove(taskIdentifier);
     hostFailures.remove(host.getHostName());
     
-    if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
+    if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
       output.commit();
-      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
-        shuffledMapsCounter.increment(1);
-        if (--remainingMaps == 0) {
-          notifyAll();
-        }
+      setInputFinished(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex());
+      shuffledMapsCounter.increment(1);
+      if (--remainingMaps == 0) {
+        notifyAll();
       }
 
       // update the status
@@ -288,16 +285,6 @@ class ShuffleScheduler {
 
   }
   
-  public synchronized void tipFailed(int srcTaskIndex) {
-    if (!isFinishedTaskTrue(srcTaskIndex)) {
-      setFinishedTaskTrue(srcTaskIndex);
-      if (--remainingMaps == 0) {
-        notifyAll();
-      }
-      logProgress();
-    }
-  }
-  
   public synchronized void addKnownMapOutput(String hostName,
                                              int partitionId,
                                              String hostUrl,
@@ -310,7 +297,8 @@ class ShuffleScheduler {
       mapLocations.put(identifier, host);
     }
     host.addKnownMap(srcAttempt);
-    pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
+    pathToIdentifierMap.put(
+        getIdentifierFromPathAndReduceId(srcAttempt.getPathComponent(), partitionId), srcAttempt);
 
     // Mark the host as pending
     if (host.getState() == MapHost.State.PENDING) {
@@ -351,8 +339,9 @@ class ShuffleScheduler {
       return host;
   }
   
-  public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
-    return pathToIdentifierMap.get(pathComponent);
+  public InputAttemptIdentifier getIdentifierForFetchedOutput(
+      String path, int reduceId) {
+    return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
   }
   
   public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
@@ -364,7 +353,7 @@ class ShuffleScheduler {
     // find the maps that we still need, up to the limit
     while (itr.hasNext()) {
       InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
+      if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
@@ -374,7 +363,7 @@ class ShuffleScheduler {
     // put back the maps left after the limit
     while (itr.hasNext()) {
       InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
+      if (!obsoleteMaps.contains(id) && !isInputFinished(id.getInputIdentifier().getSrcTaskIndex())) {
         host.addKnownMap(id);
       }
     }
@@ -448,6 +437,10 @@ class ShuffleScheduler {
     
   }
   
+  private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
+    return path + "_" + reduceId;
+  }
+  
   /**
    * A thread that takes hosts off of the penalty list when the timer expires.
    */
@@ -488,34 +481,15 @@ class ShuffleScheduler {
     }
   }
   
-  void setFinishedTaskTrue(int srcTaskIndex) {
+  void setInputFinished(int inputIndex) {
     synchronized(finishedMaps) {
-      finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
+      finishedMaps[inputIndex] = true;
     }
   }
   
-  boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
-    synchronized(finishedMaps) {
-      MutableInt result = finishedMaps.get(srcTaskIndex);
-      if(result == null) {
-        result = new MutableInt(0);
-        finishedMaps.put(srcTaskIndex, result);
-      }
-      result.increment();
-      return isFinishedTaskTrue(srcTaskIndex);
-    }
-  }
-  
-  boolean isFinishedTaskTrue(int srcTaskIndex) {
+  boolean isInputFinished(int inputIndex) {
     synchronized (finishedMaps) {
-      MutableInt result = finishedMaps.get(srcTaskIndex);
-      if(result == null) {
-        return false;
-      }
-      if (result.intValue() == shuffle.getReduceRange()) {
-        return true;
-      }
-      return false;      
+      return finishedMaps[inputIndex];      
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
index a19d5e1..c8a8233 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -57,7 +57,7 @@ public class LocalOnFileSorterOutput extends OnFileSortedOutput {
   }
   
   @Override
-  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+  protected List<Event> generateEventsOnClose() throws IOException {
     return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 5415053..8acc258 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -26,15 +26,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
 
 import com.google.common.collect.Lists;
 
@@ -93,10 +96,10 @@ public class OnFileSortedOutput implements LogicalOutput {
     sorter.close();
     this.endTime = System.nanoTime();
 
-   return generateDataMovementEventsOnClose();
+   return generateEventsOnClose();
   }
   
-  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+  protected List<Event> generateEventsOnClose() throws IOException {
     String host = System.getenv(ApplicationConstants.Environment.NM_HOST
         .toString());
     ByteBuffer shuffleMetadata = outputContext
@@ -112,8 +115,17 @@ public class OnFileSortedOutput implements LogicalOutput {
     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
     byte[] payloadBytes = payloadProto.toByteArray();
 
-    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+    long outputSize = outputContext.getCounters()
+        .findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
+    VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
+        .newBuilder();
+    vmBuilder.setOutputSize(outputSize);
+    VertexManagerEvent vmEvent = new VertexManagerEvent(
+        outputContext.getDestinationVertexName(), vmBuilder.build().toByteArray());    
 
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
+    events.add(vmEvent);
+    
     for (int i = 0; i < numOutputs; i++) {
       DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
       events.add(event);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/04d5e377/tez-runtime-library/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/proto/ShufflePayloads.proto b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
index 1fb000f..0396eec 100644
--- a/tez-runtime-library/src/main/proto/ShufflePayloads.proto
+++ b/tez-runtime-library/src/main/proto/ShufflePayloads.proto
@@ -31,3 +31,7 @@ message DataMovementEventPayloadProto {
 message InputInformationEventPayloadProto {
   optional int32 partition_range = 1;
 }
+
+message VertexManagerEventPayloadProto {
+  optional int64 output_size = 1;
+}
\ No newline at end of file