You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/03/06 23:34:07 UTC

git commit: TEZ-910. Allow ShuffledUnorderedKVInput to work for cases other than broadcast. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 0df108154 -> 1b170a4ec


TEZ-910. Allow ShuffledUnorderedKVInput to work for cases other than
broadcast. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1b170a4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1b170a4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1b170a4e

Branch: refs/heads/master
Commit: 1b170a4ec0027cdb7dd5d5e28341187623df8658
Parents: 0df1081
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Mar 6 14:30:58 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Mar 6 14:30:58 2014 -0800

----------------------------------------------------------------------
 .../BroadcastShuffleInputEventHandler.java      | 154 -------------------
 .../library/input/ShuffledUnorderedKVInput.java |   9 +-
 .../library/shuffle/common/InputHost.java       |  40 +++--
 .../impl/ShuffleInputEventHandlerImpl.java      | 151 ++++++++++++++++++
 .../shuffle/common/impl/ShuffleManager.java     |  11 +-
 5 files changed, 194 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
deleted file mode 100644
index 68aa49f..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastShuffleInputEventHandler.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.tez.runtime.library.broadcast.input;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.InputFailedEvent;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
-import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
-import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class BroadcastShuffleInputEventHandler implements ShuffleEventHandler {
-
-  private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
-  
-  private final ShuffleManager shuffleManager;
-  private final FetchedInputAllocator inputAllocator;
-  private final CompressionCodec codec;
-  private final boolean ifileReadAhead;
-  private final int ifileReadAheadLength;
-  
-  
-  public BroadcastShuffleInputEventHandler(TezInputContext inputContext,
-      ShuffleManager shuffleManager,
-      FetchedInputAllocator inputAllocator, CompressionCodec codec,
-      boolean ifileReadAhead, int ifileReadAheadLength) {
-    this.shuffleManager = shuffleManager;
-    this.inputAllocator = inputAllocator;
-    this.codec = codec;
-    this.ifileReadAhead = ifileReadAhead;
-    this.ifileReadAheadLength = ifileReadAheadLength;
-  }
-
-  @Override
-  public void handleEvents(List<Event> events) throws IOException {
-    for (Event event : events) {
-      handleEvent(event);
-    }
-  }
-  
-  private void handleEvent(Event event) throws IOException {
-    if (event instanceof DataMovementEvent) {
-      processDataMovementEvent((DataMovementEvent)event);
-    } else if (event instanceof InputFailedEvent) {
-      processInputFailedEvent((InputFailedEvent)event);
-    } else {
-      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
-    }
-  }
-  
-  
-  private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
-    Preconditions.checkArgument(dme.getSourceIndex() == 0,
-        "Unexpected srcIndex: " + dme.getSourceIndex()
-            + " on DataMovementEvent. Can only be 0");
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    }
-    LOG.info("Processing DataMovementEvent with srcIndex: "
-        + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
-        + ", attemptNum: " + dme.getVersion() + ", payload: "
-        + stringify(shufflePayload));
-    if (shufflePayload.getOutputGenerated()) {
-      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
-          dme.getTargetIndex(), dme.getVersion(),
-          shufflePayload.getPathComponent());
-      if (shufflePayload.hasData()) {
-        DataProto dataProto = shufflePayload.getData();
-        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
-        moveDataToFetchedInput(dataProto, fetchedInput);
-        shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
-      } else {
-        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
-      }
-    } else {
-      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
-    }
-  }
-  
-  private void moveDataToFetchedInput(DataProto dataProto,
-      FetchedInput fetchedInput) throws IOException {
-    switch (fetchedInput.getType()) {
-    case DISK:
-      ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto
-          .getData().newInput(), dataProto.getCompressedLength(), LOG);
-      break;
-    case MEMORY:
-      ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
-          dataProto.getData().newInput(), dataProto.getRawLength(),
-          dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
-      break;
-    case WAIT:
-    default:
-      throw new TezUncheckedException("Unexpected type: "
-          + fetchedInput.getType());
-    }
-  }
-  
-  private void processInputFailedEvent(InputFailedEvent ife) {
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
-    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
-  }
-  
-  private String stringify(DataMovementEventPayloadProto dmProto) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[");
-    sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
-    sb.append("host: " + dmProto.getHost()).append(", ");
-    sb.append("port: " + dmProto.getPort()).append(", ");
-    sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
-    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
-    sb.append("hasData: " + dmProto.hasData());
-    return sb.toString();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index adbeff8..c740748 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -41,9 +41,10 @@ import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
-import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleInputEventHandler;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.readers.ShuffledUnorderedKVReader;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleInputEventHandlerImpl;
 import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
 import org.apache.tez.runtime.library.shuffle.common.impl.SimpleFetchedInputAllocator;
 
@@ -65,7 +66,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
   private TezCounter inputRecordCounter;
   
   private SimpleFetchedInputAllocator inputManager;
-  private BroadcastShuffleInputEventHandler inputEventHandler;
+  private ShuffleEventHandler inputEventHandler;
   
   private volatile long initialMemoryAvailable = -1;
   
@@ -129,7 +130,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
         inputManager.setInitialMemoryAvailable(initialMemoryAvailable);
         inputManager.configureAndStart();
         
-        this.inputEventHandler = new BroadcastShuffleInputEventHandler(
+        this.inputEventHandler = new ShuffleInputEventHandlerImpl(
             inputContext, shuffleManager, inputManager, codec, ifileReadAhead,
             ifileReadAheadLength);
 
@@ -231,4 +232,4 @@ public class ShuffledUnorderedKVInput implements LogicalInput, MemoryUpdateCallb
         ifileReadAheadLength, ifileBufferSize, inputRecordCounter);
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 7905e27..59da655 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -26,16 +26,24 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
+/**
+ * Represents a Host with respect to the MapReduce ShuffleHandler.
+ * 
+ * srcPhysicalIndex / partition is part of this since that only knows how to
+ * serve ine partition at a time.
+ */
 public class InputHost {
 
   private final String host;
   private final int port;
+  private final int srcPhysicalIndex;
 
   private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
 
-  public InputHost(String hostName, int port, ApplicationId appId) {
+  public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex) {
     this.host = hostName;
     this.port = port;
+    this.srcPhysicalIndex = srcPhysicalIndex;
   }
 
   public String getHost() {
@@ -46,6 +54,10 @@ public class InputHost {
     return this.port;
   }
 
+  public int getSrcPhysicalIndex() {
+    return this.srcPhysicalIndex;
+  }
+
   public int getNumPendingInputs() {
     return inputs.size();
   }
@@ -67,35 +79,45 @@ public class InputHost {
     int result = 1;
     result = prime * result + ((host == null) ? 0 : host.hashCode());
     result = prime * result + port;
+    result = prime * result + srcPhysicalIndex;
     return result;
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj)
+    if (this == obj) {
       return true;
-    if (obj == null)
+    }
+    if (obj == null) {
       return false;
-    if (getClass() != obj.getClass())
+    }
+    if (getClass() != obj.getClass()) {
       return false;
+    }
     InputHost other = (InputHost) obj;
     if (host == null) {
-      if (other.host != null)
+      if (other.host != null) {
         return false;
+      }
     } else if (!host.equals(other.host))
       return false;
-    if (port != other.port)
+    if (port != other.port) {
       return false;
+    }
+    if (srcPhysicalIndex != other.srcPhysicalIndex) {
+      return false;
+    }
     return true;
   }
 
   public String toDetailedString() {
-    return "InputHost [host=" + host + ", port=" + port + ", inputs=" + inputs
-        + "]";
+    return "InputHost [host=" + host + ", port=" + port + ",srcPhysicalIndex=" + srcPhysicalIndex
+        + ", inputs=" + inputs + "]";
   }
   
   @Override
   public String toString() {
-    return "InputHost [host=" + host + ", port=" + port + "]";
+    return "InputHost [host=" + host + ", port=" + port + ", srcPhysicalIndex=" + srcPhysicalIndex
+        + "]";
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
new file mode 100644
index 0000000..1ae6791
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleInputEventHandlerImpl.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.runtime.library.shuffle.common.impl;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputFailedEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * A base class for generic Event handling for Inputs which need to Shuffle data.
+ */
+public class ShuffleInputEventHandlerImpl implements ShuffleEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerImpl.class);
+  
+  private final ShuffleManager shuffleManager;
+  private final FetchedInputAllocator inputAllocator;
+  private final CompressionCodec codec;
+  private final boolean ifileReadAhead;
+  private final int ifileReadAheadLength;
+  
+  
+  public ShuffleInputEventHandlerImpl(TezInputContext inputContext,
+      ShuffleManager shuffleManager,
+      FetchedInputAllocator inputAllocator, CompressionCodec codec,
+      boolean ifileReadAhead, int ifileReadAheadLength) {
+    this.shuffleManager = shuffleManager;
+    this.inputAllocator = inputAllocator;
+    this.codec = codec;
+    this.ifileReadAhead = ifileReadAhead;
+    this.ifileReadAheadLength = ifileReadAheadLength;
+  }
+
+  @Override
+  public void handleEvents(List<Event> events) throws IOException {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  private void handleEvent(Event event) throws IOException {
+    if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent)event);
+    } else if (event instanceof InputFailedEvent) {
+      processInputFailedEvent((InputFailedEvent)event);
+    } else {
+      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+    }
+  }
+
+  private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    }
+    LOG.info("Processing DataMovementEvent with srcIndex: "
+        + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex()
+        + ", attemptNum: " + dme.getVersion() + ", payload: "
+        + stringify(shufflePayload));
+    if (shufflePayload.getOutputGenerated()) {
+      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(
+          dme.getTargetIndex(), dme.getVersion(),
+          shufflePayload.getPathComponent());
+      if (shufflePayload.hasData()) {
+        DataProto dataProto = shufflePayload.getData();
+        FetchedInput fetchedInput = inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
+        moveDataToFetchedInput(dataProto, fetchedInput);
+        shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
+      } else {
+        shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, dme.getSourceIndex());
+      }
+    } else {
+      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+    }
+  }
+  
+  private void moveDataToFetchedInput(DataProto dataProto,
+      FetchedInput fetchedInput) throws IOException {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      ShuffleUtils.shuffleToDisk((DiskFetchedInput) fetchedInput, dataProto
+          .getData().newInput(), dataProto.getCompressedLength(), LOG);
+      break;
+    case MEMORY:
+      ShuffleUtils.shuffleToMemory((MemoryFetchedInput) fetchedInput,
+          dataProto.getData().newInput(), dataProto.getRawLength(),
+          dataProto.getCompressedLength(), codec, ifileReadAhead, ifileReadAheadLength, LOG);
+      break;
+    case WAIT:
+    default:
+      throw new TezUncheckedException("Unexpected type: "
+          + fetchedInput.getType());
+    }
+  }
+  
+  private void processInputFailedEvent(InputFailedEvent ife) {
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+  }
+  
+  private String stringify(DataMovementEventPayloadProto dmProto) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
+    sb.append("host: " + dmProto.getHost()).append(", ");
+    sb.append("port: " + dmProto.getPort()).append(", ");
+    sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
+    sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
+    sb.append("hasDataInEvent: " + dmProto.hasData());
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b170a4e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index 6b91558..c04c134 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -74,6 +74,9 @@ import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+// This only knows how to deal with a single srcIndex for a given targetIndex.
+// In case the src task generates multiple outputs for the same target Index
+// (multiple src-indices), modifications will be required.
 public class ShuffleManager implements FetcherCallback {
 
   private static final Log LOG = LogFactory.getLog(ShuffleManager.class);
@@ -339,8 +342,8 @@ public class ShuffleManager implements FetcherCallback {
     }
     // TODO NEWTEZ Maybe limit the number of inputs being given to a single
     // fetcher, especially in the case where #hosts < #fetchers
-    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
-        pendingInputsForHost);
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
+        inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
     LOG.info("Created Fetcher for host: " + inputHost.getHost()
         + ", with inputs: " + pendingInputsForHost);
     return fetcherBuilder.build();
@@ -349,10 +352,10 @@ public class ShuffleManager implements FetcherCallback {
   /////////////////// Methods for InputEventHandler
   
   public void addKnownInput(String hostName, int port,
-      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+      InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
     InputHost host = knownSrcHosts.get(hostName);
     if (host == null) {
-      host = new InputHost(hostName, port, inputContext.getApplicationId());
+      host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
       InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
       if (old != null) {
         host = old;