You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/20 19:19:30 UTC

[2/2] git commit: TEZ-414. Implement a non sorted, non aggregated K-V input/output pair. (sseth)

TEZ-414. Implement a non sorted, non aggregated K-V input/output pair.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bd76ffcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bd76ffcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bd76ffcf

Branch: refs/heads/TEZ-398
Commit: bd76ffcf2b44eff1c2d8a3abfd24a978edccdc58
Parents: bda095c
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Sep 20 10:18:36 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Sep 20 10:18:36 2013 -0700

----------------------------------------------------------------------
 .../broadcast/input/BroadcastInputManager.java  | 138 +++++
 .../broadcast/input/BroadcastKVReader.java      | 225 +++++++
 .../BroadcastShuffleInputEventHandler.java      |  88 +++
 .../input/BroadcastShuffleManager.java          | 489 +++++++++++++++
 .../broadcast/output/FileBasedKVWriter.java     | 125 ++++
 .../engine/common/InputAttemptIdentifier.java   |  95 +++
 .../tez/engine/common/InputIdentifier.java      |  56 ++
 .../tez/engine/common/TezEngineUtils.java       |  21 +
 .../tez/engine/common/shuffle/impl/Fetcher.java |  42 +-
 .../common/shuffle/impl/InMemoryReader.java     |   5 +-
 .../tez/engine/common/shuffle/impl/MapHost.java |  14 +-
 .../engine/common/shuffle/impl/MapOutput.java   |  13 +-
 .../common/shuffle/impl/MergeManager.java       |  18 +-
 .../tez/engine/common/shuffle/impl/Shuffle.java |   1 +
 .../common/shuffle/impl/ShuffleHeader.java      |   8 +
 .../shuffle/impl/ShuffleInputEventHandler.java  |   5 +-
 .../common/shuffle/impl/ShuffleScheduler.java   |  60 +-
 .../shuffle/impl/TaskAttemptIdentifier.java     |  95 ---
 .../engine/common/sort/impl/ExternalSorter.java |  26 +-
 .../tez/engine/common/sort/impl/IFile.java      |   2 +-
 .../newoutput/TezLocalTaskOutputFiles.java      |  21 +-
 .../task/local/newoutput/TezTaskOutput.java     |  13 +-
 .../local/newoutput/TezTaskOutputFiles.java     |  18 +-
 .../lib/input/ShuffledUnorderedKVInput.java     |  76 +++
 .../engine/lib/output/OnFileSortedOutput.java   |   4 +-
 .../lib/output/OnFileUnorderedKVOutput.java     |  98 +++
 .../org/apache/tez/engine/newapi/KVReader.java  |   5 +-
 .../engine/shuffle/common/DiskFetchedInput.java | 111 ++++
 .../tez/engine/shuffle/common/FetchResult.java  |  70 +++
 .../tez/engine/shuffle/common/FetchedInput.java | 144 +++++
 .../shuffle/common/FetchedInputAllocator.java   |  31 +
 .../shuffle/common/FetchedInputCallback.java    |  29 +
 .../tez/engine/shuffle/common/Fetcher.java      | 608 +++++++++++++++++++
 .../engine/shuffle/common/FetcherCallback.java  |  31 +
 .../tez/engine/shuffle/common/InputHost.java    |  90 +++
 .../shuffle/common/MemoryFetchedInput.java      |  89 +++
 .../tez/engine/shuffle/common/ShuffleUtils.java |  16 +
 tez-engine/src/main/proto/ShufflePayloads.proto |   9 +-
 38 files changed, 2781 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
new file mode 100644
index 0000000..78d2e0c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
+import org.apache.tez.engine.shuffle.common.FetchedInputCallback;
+import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastInputManager implements FetchedInputAllocator,
+    FetchedInputCallback {
+
+  private final Configuration conf;
+
+  private final TezTaskOutputFiles fileNameAllocator;
+  private final LocalDirAllocator localDirAllocator;
+
+  // Configuration parameters
+  private final long memoryLimit;
+  private final long maxSingleShuffleLimit;
+
+  private long usedMemory = 0;
+
+  public BroadcastInputManager(TezInputContext inputContext, Configuration conf) {
+    this.conf = conf;
+
+    this.fileNameAllocator = new TezTaskOutputFiles(conf,
+        inputContext.getUniqueIdentifier());
+    this.localDirAllocator = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+    // Setup configuration
+    final float maxInMemCopyUse = conf.getFloat(
+        TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": "
+          + maxInMemCopyUse);
+    }
+
+    // Allow unit tests to fix Runtime memory
+    this.memoryLimit = (long) (conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
+        Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE)) * maxInMemCopyUse);
+
+    final float singleShuffleMemoryLimitPercent = conf.getFloat(
+        TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    if (singleShuffleMemoryLimitPercent <= 0.0f
+        || singleShuffleMemoryLimitPercent > 1.0f) {
+      throw new IllegalArgumentException("Invalid value for "
+          + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
+          + singleShuffleMemoryLimitPercent);
+    }
+
+    this.maxSingleShuffleLimit = (long) (memoryLimit * singleShuffleMemoryLimitPercent);
+  }
+
+  @Override
+  public synchronized FetchedInput allocate(long size,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
+    if (size > maxSingleShuffleLimit
+        || this.usedMemory + size > this.memoryLimit) {
+      return new DiskFetchedInput(size, inputAttemptIdentifier, this, conf,
+          localDirAllocator, fileNameAllocator);
+    } else {
+      this.usedMemory += size;
+      return new MemoryFetchedInput(size, inputAttemptIdentifier, this);
+    }
+  }
+
+  @Override
+  public void fetchComplete(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    // Not tracking anything here.
+    case DISK:
+    case MEMORY:
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  @Override
+  public void fetchFailed(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  @Override
+  public void freeResources(FetchedInput fetchedInput) {
+    cleanup(fetchedInput);
+  }
+
+  private void cleanup(FetchedInput fetchedInput) {
+    switch (fetchedInput.getType()) {
+    case DISK:
+      break;
+    case MEMORY:
+      unreserve(fetchedInput.getSize());
+      break;
+    default:
+      throw new TezUncheckedException("InputType: " + fetchedInput.getType()
+          + " not expected for Broadcast fetch");
+    }
+  }
+
+  private synchronized void unreserve(long size) {
+    this.usedMemory -= size;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
new file mode 100644
index 0000000..b36c240
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
+import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;
+
+public class BroadcastKVReader<K, V> implements KVReader {
+
+  private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
+  
+  private final BroadcastShuffleManager shuffleManager;
+  private final Configuration conf;
+  private final CompressionCodec codec;
+  
+  private final Class<K> keyClass;
+  private final Class<V> valClass;
+  private final Deserializer<K> keyDeserializer;
+  private final Deserializer<V> valDeserializer;
+  private final DataInputBuffer keyIn;
+  private final DataInputBuffer valIn;
+
+  private final SimpleValueIterator valueIterator;
+  private final SimpleIterable valueIterable;
+  
+  private K key;
+  private V value;
+  
+  private FetchedInput currentFetchedInput;
+  private IFile.Reader currentReader;
+  
+  
+  public BroadcastKVReader(BroadcastShuffleManager shuffleManager,
+      Configuration conf) {
+    this.shuffleManager = shuffleManager;
+    this.conf = conf;
+
+    if (ConfigUtils.isIntermediateInputCompressed(this.conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      codec = null;
+    }
+
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+
+    this.keyIn = new DataInputBuffer();
+    this.valIn = new DataInputBuffer();
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass); 
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    
+    this.valueIterator = new SimpleValueIterator();
+    this.valueIterable = new SimpleIterable(this.valueIterator);
+  }
+
+  // TODO NEWTEZ Maybe add an interface to check whether next will block.
+  
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no
+   *         more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  @Override  
+  public boolean next() throws IOException {
+    if (readNextFromCurrentReader()) {
+      return true;
+    } else {
+      boolean nextInputExists = moveToNextInput();
+      while (nextInputExists) {
+        if(readNextFromCurrentReader()) {
+          return true;
+        }
+        nextInputExists = moveToNextInput();
+      }
+      return false;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public KVRecord getCurrentKV() throws IOException {
+    this.valueIterator.setValue(value);
+    return new KVRecord((Object)key, (Iterable<Object>)this.valueIterable);
+  }
+
+  /**
+   * Tries reading the next key and value from the current reader.
+   * @return true if the current reader has more records
+   * @throws IOException
+   */
+  private boolean readNextFromCurrentReader() throws IOException {
+    // Initial reader.
+    if (this.currentReader == null) {
+      return false;
+    } else {
+      boolean hasMore = this.currentReader.nextRawKey(keyIn);
+      if (hasMore) {
+        this.currentReader.nextRawValue(valIn);
+        this.key = keyDeserializer.deserialize(this.key);
+        this.value = valDeserializer.deserialize(this.value);
+        return true;
+      }
+      return false;
+    }
+  }
+  
+  /**
+   * Moves to the next available input. This method may block if the input is not ready yet.
+   * Also takes care of closing the previous input.
+   * 
+   * @return true if the next input exists, false otherwise
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean moveToNextInput() throws IOException {
+    if (currentReader != null) { // Close the current reader.
+      currentReader.close();
+      currentFetchedInput.free();
+    }
+    try {
+      currentFetchedInput = shuffleManager.getNextInput();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for next available input", e);
+      throw new IOException(e);
+    }
+    if (currentFetchedInput == null) {
+      return false; // No more inputs
+    } else {
+      currentReader = openIFileReader(currentFetchedInput);
+      return true;
+    }
+  }
+
+  public IFile.Reader openIFileReader(FetchedInput fetchedInput)
+      throws IOException {
+    if (fetchedInput.getType() == Type.MEMORY) {
+      MemoryFetchedInput mfi = (MemoryFetchedInput) fetchedInput;
+
+      return new InMemoryReader(null, mfi.getInputAttemptIdentifier(),
+          mfi.getBytes(), 0, (int) mfi.getSize());
+    } else {
+      return new IFile.Reader(conf, fetchedInput.getInputStream(),
+          fetchedInput.getSize(), codec, null);
+    }
+  }
+
+  
+  
+  // TODO NEWTEZ Move this into a common class. Also used in SImpleInput
+  private class SimpleValueIterator implements Iterator<V> {
+
+    private V value;
+
+    public void setValue(V value) {
+      this.value = value;
+    }
+
+    public boolean hasNext() {
+      return value != null;
+    }
+
+    public V next() {
+      V value = this.value;
+      this.value = null;
+      return value;
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private class SimpleIterable implements Iterable<V> {
+    private final Iterator<V> iterator;
+    public SimpleIterable(Iterator<V> iterator) {
+      this.iterator = iterator;
+    }
+
+    @Override
+    public Iterator<V> iterator() {
+      return iterator;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
new file mode 100644
index 0000000..703e1d9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleInputEventHandler.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.shuffle.impl.ShuffleInputEventHandler;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class BroadcastShuffleInputEventHandler {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+  
+  private final BroadcastShuffleManager shuffleManager;
+  
+  public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager) {
+    this.shuffleManager = shuffleManager;
+  }
+
+  public void handleEvents(List<Event> events) {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  private void handleEvent(Event event) {
+    if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent)event);
+    } else if (event instanceof InputFailedEvent) {
+      processInputFailedEvent((InputFailedEvent)event);
+    } else {
+      throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
+    }
+  }
+  
+  
+  private void processDataMovementEvent(DataMovementEvent dme) {
+    Preconditions.checkArgument(dme.getSourceIndex() == 0,
+        "Unexpected srcIndex: " + dme.getSourceIndex()
+            + " on DataMovementEvent. Can only be 0");
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    }
+    if (shufflePayload.getOutputGenerated()) {
+      InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
+      shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
+    } else {
+      shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
+    }
+  }
+  
+  private void processInputFailedEvent(InputFailedEvent ife) {
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
+    shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
new file mode 100644
index 0000000..dace07c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastShuffleManager.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.broadcast.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.crypto.SecretKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+import org.apache.tez.engine.common.InputIdentifier;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.shuffle.common.FetchResult;
+import org.apache.tez.engine.shuffle.common.FetchedInput;
+import org.apache.tez.engine.shuffle.common.Fetcher;
+import org.apache.tez.engine.shuffle.common.FetcherCallback;
+import org.apache.tez.engine.shuffle.common.InputHost;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+import org.apache.tez.engine.shuffle.common.Fetcher.FetcherBuilder;
+import org.apache.tez.engine.shuffle.common.FetchedInputAllocator;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class BroadcastShuffleManager implements FetcherCallback {
+
+  private static final Log LOG = LogFactory.getLog(BroadcastShuffleManager.class);
+  
+  private TezInputContext inputContext;
+  private int numInputs;
+  private Configuration conf;
+  
+  private final BroadcastShuffleInputEventHandler inputEventHandler;
+  private final FetchedInputAllocator inputManager;
+  
+  private final ExecutorService fetcherRawExecutor;
+  private final ListeningExecutorService fetcherExecutor;
+
+  private final BlockingQueue<FetchedInput> completedInputs;
+  private final Set<InputIdentifier> completedInputSet;
+  private final Set<InputIdentifier> pendingInputs;
+  private final ConcurrentMap<String, InputHost> knownSrcHosts;
+  private final Set<InputHost> pendingHosts;
+  private final Set<InputAttemptIdentifier> obsoletedInputs;
+  
+  private final AtomicInteger numCompletedInputs = new AtomicInteger(0);
+  
+  private final long startTime;
+  private long lastProgressTime;
+  
+  private FutureTask<Void> runShuffleFuture;
+  
+  // Required to be held when manipulating pendingHosts
+  private ReentrantLock lock = new ReentrantLock();
+  private Condition wakeLoop = lock.newCondition();
+  
+  private final int numFetchers;
+  private final AtomicInteger numRunningFetchers = new AtomicInteger(0);
+  
+  // Parameters required by Fetchers
+  private final SecretKey shuffleSecret;
+  private final int connectionTimeout;
+  private final int readTimeout;
+  private final CompressionCodec codec;
+  private final Decompressor decompressor;
+  
+  private final FetchFutureCallback fetchFutureCallback = new FetchFutureCallback();
+  
+  private volatile Throwable shuffleError;
+  
+  // TODO NEWTEZ Add counters.
+  
+  public BroadcastShuffleManager(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = conf;
+    this.numInputs = numInputs;
+    
+    this.inputEventHandler = new BroadcastShuffleInputEventHandler(inputContext, this);
+    this.inputManager = new BroadcastInputManager(inputContext, conf);
+
+    pendingInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputSet = Collections.newSetFromMap(new ConcurrentHashMap<InputIdentifier, Boolean>(numInputs));
+    completedInputs = new LinkedBlockingQueue<FetchedInput>(numInputs);
+    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    pendingHosts = Collections.newSetFromMap(new ConcurrentHashMap<InputHost, Boolean>());
+    obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier, Boolean>());
+    
+    int maxConfiguredFetchers = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+    
+    this.numFetchers = Math.min(maxConfiguredFetchers, numInputs);
+    
+    this.fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Fetcher #%d")
+            .build());
+    this.fetcherExecutor = MoreExecutors.listeningDecorator(fetcherRawExecutor);
+    
+    this.startTime = System.currentTimeMillis();
+    this.lastProgressTime = startTime;
+    
+    this.shuffleSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    
+    this.connectionTimeout = conf.getInt(
+        TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_STALLED_COPY_TIMEOUT);
+    this.readTimeout = conf.getInt(
+        TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT,
+        TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+    
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateInputCompressorClass(conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+      decompressor = CodecPool.getDecompressor(codec);
+    } else {
+      codec = null;
+      decompressor = null;
+    }
+  }
+  
+  public void run() {
+    RunBroadcastShuffleCallable callable = new RunBroadcastShuffleCallable();
+    runShuffleFuture = new FutureTask<Void>(callable);
+    new Thread(runShuffleFuture, "ShuffleRunner");
+  }
+  
+  private class RunBroadcastShuffleCallable implements Callable<Void> {
+
+    @Override
+    public Void call() throws Exception {
+      while (numCompletedInputs.get() < numInputs) {
+        if (numRunningFetchers.get() >= numFetchers || pendingHosts.size() == 0) {
+          synchronized(lock) {
+            wakeLoop.await();
+          }
+          if (shuffleError != null) {
+            // InputContext has already been informed of a fatal error.
+            // Initiate shutdown.
+            break;
+          }
+          
+          if (numCompletedInputs.get() < numInputs) {
+            synchronized (lock) {
+              int numFetchersToRun = Math.min(pendingHosts.size(), numFetchers - numRunningFetchers.get());
+              int count = 0;
+              for (Iterator<InputHost> inputHostIter = pendingHosts.iterator() ; inputHostIter.hasNext() ; ) {
+                InputHost inputHost = inputHostIter.next();
+                inputHostIter.remove();
+                if (inputHost.getNumPendingInputs() > 0) {
+                  Fetcher fetcher = constructFetcherForHost(inputHost);
+                  numRunningFetchers.incrementAndGet();
+                  ListenableFuture<FetchResult> future = fetcherExecutor
+                      .submit(fetcher);
+                  Futures.addCallback(future, fetchFutureCallback);
+                  if (++count >= numFetchersToRun) {
+                    break;
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+      // TODO NEWTEZ Maybe clean up inputs.
+      if (!fetcherExecutor.isShutdown()) {
+        fetcherExecutor.shutdownNow();
+      }
+      return null;
+    }
+  }
+  
+  private Fetcher constructFetcherForHost(InputHost inputHost) {
+    FetcherBuilder fetcherBuilder = new FetcherBuilder(
+        BroadcastShuffleManager.this, inputManager,
+        inputContext.getApplicationId(), shuffleSecret, conf);
+    fetcherBuilder.setConnectionParameters(connectionTimeout, readTimeout);
+    fetcherBuilder.setCompressionParameters(codec, decompressor);
+
+    // Remove obsolete inputs from the list being given to the fetcher. Also
+    // remove from the obsolete list.
+    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
+        .clearAndGetPendingInputs();
+    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
+        .iterator(); inputIter.hasNext();) {
+      InputAttemptIdentifier input = inputIter.next();
+      // Avoid adding attempts which have already completed.
+      if (completedInputSet.contains(input.getInputIdentifier())) {
+        inputIter.remove();
+      }
+      // Avoid adding attempts which have been marked as OBSOLETE 
+      if (obsoletedInputs.contains(input)) {
+        inputIter.remove();
+        obsoletedInputs.remove(input);
+      }
+    }
+    // TODO NEWTEZ Maybe limit the number of inputs being given to a single
+    // fetcher, especially in the case where #hosts < #fetchers
+    fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(), 0,
+        inputHost.clearAndGetPendingInputs());
+    return fetcherBuilder.build();
+  }
+  
+  /////////////////// Methods for InputEventHandler
+  
+  public void addKnownInput(String hostName, int port,
+      InputAttemptIdentifier srcAttemptIdentifier, int partition) {
+    InputHost host = knownSrcHosts.get(hostName);
+    if (host == null) {
+      host = new InputHost(hostName, port, inputContext.getApplicationId());
+      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+      if (old != null) {
+        host = old;
+      }
+    }
+    host.addKnownInput(srcAttemptIdentifier);
+    synchronized(lock) {
+      pendingHosts.add(host);
+      wakeLoop.signal();
+    }
+  }
+
+  public void addCompletedInputWithNoData(
+      InputAttemptIdentifier srcAttemptIdentifier) {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    LOG.info("No input data exists for SrcTask: " + inputIdentifier + ". Marking as complete.");
+    if (pendingInputs.remove(inputIdentifier)) {
+      completedInputSet.add(inputIdentifier);
+      completedInputs.add(new NullFetchedInput(srcAttemptIdentifier));
+      numCompletedInputs.incrementAndGet();
+    }
+
+    // Awake the loop to check for termination.
+    synchronized (lock) {
+      wakeLoop.signal();
+    } 
+  }
+
+  public synchronized void obsoleteKnownInput(InputAttemptIdentifier srcAttemptIdentifier) {
+    obsoletedInputs.add(srcAttemptIdentifier);
+    // TODO NEWTEZ Maybe inform the fetcher about this. For now, this is used during the initial fetch list construction.
+  }
+  
+  
+  public void handleEvents(List<Event> events) {
+    inputEventHandler.handleEvents(events);
+  }
+
+  /////////////////// End of Methods for InputEventHandler
+  /////////////////// Methods from FetcherCallbackHandler
+  
+  @Override
+  public void fetchSucceeded(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes,
+      long copyDuration) throws IOException {
+    InputIdentifier inputIdentifier = srcAttemptIdentifier.getInputIdentifier();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Complete fetch for attempt: " + srcAttemptIdentifier + " to " + fetchedInput.getType());
+    }
+    
+    // Count irrespective of whether this is a copy of an already fetched input
+    synchronized(lock) {
+      lastProgressTime = System.currentTimeMillis();
+    }
+    
+    boolean committed = false;
+    if (!completedInputSet.contains(inputIdentifier)) {
+      synchronized (completedInputSet) {
+        if (!completedInputSet.contains(inputIdentifier)) {
+          fetchedInput.commit();
+          committed = true;
+          pendingInputs.remove(inputIdentifier);
+          completedInputSet.add(inputIdentifier);
+          completedInputs.add(fetchedInput);
+          numCompletedInputs.incrementAndGet();
+        }
+      }
+    }
+    if (!committed) {
+      fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
+    } else {
+      synchronized(lock) {
+        // Signal the wakeLoop to check for termination.
+        wakeLoop.signal();
+      }
+    }
+    // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the same task in their queue.
+  }
+
+  @Override
+  public void fetchFailed(String host,
+      InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed) {
+    // TODO NEWTEZ. Implement logic to report fetch failures after a threshold.
+    // For now, reporting immediately.
+    InputReadErrorEvent readError = new InputReadErrorEvent(
+        "Fetch failure while fetching from "
+            + TezEngineUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(),
+                srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+                srcAttemptIdentifier.getAttemptNumber()),
+        srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
+        srcAttemptIdentifier.getAttemptNumber());
+    
+    List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+    failedEvents.add(readError);
+    inputContext.sendEvents(failedEvents);
+  }
+  /////////////////// End of Methods from FetcherCallbackHandler
+
+  public void shutdown() throws InterruptedException {
+    if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
+      this.fetcherExecutor.shutdown();
+      this.fetcherExecutor.awaitTermination(2000l, TimeUnit.MILLISECONDS);
+      if (!this.fetcherExecutor.isShutdown()) {
+        this.fetcherExecutor.shutdownNow();
+      }
+    }
+  }
+  
+  /////////////////// Methods for walking the available inputs
+  
+  /**
+   * @return true if there is another input ready for consumption.
+   */
+  public boolean newInputAvailable() {
+    FetchedInput head = completedInputs.peek();
+    if (head == null || head instanceof NullFetchedInput) {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * @return true if all of the required inputs have been fetched.
+   */
+  public boolean allInputsFetched() {
+    return numCompletedInputs.get() == numInputs;
+  }
+
+  /**
+   * @return the next available input, or null if there are no available inputs.
+   *         This method will block if there are currently no available inputs,
+   *         but more may become available.
+   */
+  public FetchedInput getNextInput() throws InterruptedException {
+    FetchedInput input = null;
+    do {
+      input = completedInputs.peek();
+      if (input == null) {
+        if (allInputsFetched()) {
+          break;
+        } else {
+          input = completedInputs.take(); // block
+        }
+      } else {
+        input = completedInputs.poll();
+      }
+    } while (input instanceof NullFetchedInput);
+    return input;
+  }
+
+  /////////////////// End of methods for walking the available inputs
+  
+  
+  /**
+   * Fake input that is added to the completed input list in case an input does not have any data.
+   *
+   */
+  private class NullFetchedInput extends FetchedInput {
+
+    public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void commit() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void abort() throws IOException {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+
+    @Override
+    public void free() {
+      throw new UnsupportedOperationException("Not supported for NullFetchedInput");
+    }
+  }
+  
+  
+  private class FetchFutureCallback implements FutureCallback<FetchResult> {
+
+    private void doBookKeepingForFetcherComplete() {
+      numRunningFetchers.decrementAndGet();
+      synchronized(lock) {
+        wakeLoop.signal();
+      }
+    }
+    
+    @Override
+    public void onSuccess(FetchResult result) {
+      Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
+      if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
+        InputHost inputHost = knownSrcHosts.get(result.getHost());
+        assert inputHost != null;
+        for (InputAttemptIdentifier input : pendingInputs) {
+          inputHost.addKnownInput(input);
+        }
+        pendingHosts.add(inputHost);
+      }
+      doBookKeepingForFetcherComplete();
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      LOG.error("Fetcher failed with error: " + t);
+      shuffleError = t;
+      inputContext.fatalError(t, "Fetched failed");
+      doBookKeepingForFetcherComplete();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
new file mode 100644
index 0000000..9f3dbbe
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
@@ -0,0 +1,125 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.broadcast.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.common.sort.impl.IFile;
+import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
+import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.TezOutputContext;
+
+public class FileBasedKVWriter implements KVWriter {
+
+  public static final int INDEX_RECORD_LENGTH = 24;
+
+  private final Configuration conf;
+  private int numRecords = 0;
+
+  @SuppressWarnings("rawtypes")
+  private Class keyClass;
+  @SuppressWarnings("rawtypes")
+  private Class valClass;
+  private CompressionCodec codec;
+  private FileSystem rfs;
+  private IFile.Writer writer;
+
+  private TezTaskOutput ouputFileManager;
+
+  // TODO NEWTEZ Define Counters
+  // Number of records
+  // Time waiting for a write to complete, if that's possible.
+  // Size of key-value pairs written.
+
+  public FileBasedKVWriter(TezOutputContext outputContext) throws IOException {
+    this.conf = TezUtils.createConfFromUserPayload(outputContext
+        .getUserPayload());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+        outputContext.getWorkDirs());
+
+    this.rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+    // Setup serialization
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+
+    // Setup compression
+    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+      Class<? extends CompressionCodec> codecClass = ConfigUtils
+          .getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, this.conf);
+    } else {
+      codec = null;
+    }
+
+    this.ouputFileManager = TezEngineUtils.instantiateTaskOutputManager(conf,
+        outputContext);
+
+    initWriter();
+  }
+
+  /**
+   * @return true if any output was generated. false otherwise
+   * @throws IOException
+   */
+  public boolean close() throws IOException {
+    this.writer.close();
+    TezIndexRecord rec = new TezIndexRecord(0, writer.getRawLength(),
+        writer.getCompressedLength());
+    TezSpillRecord sr = new TezSpillRecord(1);
+    sr.putIndex(rec, 0);
+
+    Path indexFile = ouputFileManager
+        .getOutputIndexFileForWrite(INDEX_RECORD_LENGTH);
+    sr.writeToFile(indexFile, conf);
+    return numRecords > 0;
+  }
+
+  @Override
+  public void write(Object key, Object value) throws IOException {
+    this.writer.append(key, value);
+    numRecords++;
+  }
+
+  public void initWriter() throws IOException {
+    Path outputFile = ouputFileManager.getOutputFileForWrite();
+
+    // TODO NEWTEZ Consider making the buffer size configurable. Also consider
+    // setting up an in-memory buffer which is occasionally flushed to disk so
+    // that the output does not block.
+
+    // TODO NEWTEZ maybe use appropriate counter
+    this.writer = new IFile.Writer(conf, rfs, outputFile, keyClass, valClass,
+        codec, null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
new file mode 100644
index 0000000..076807e
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/InputAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class InputAttemptIdentifier {
+
+  private final InputIdentifier inputIdentifier;
+  private final int attemptNumber;
+  private String pathComponent;
+  
+  public InputAttemptIdentifier(int taskIndex, int attemptNumber) {
+    this(new InputIdentifier(taskIndex), attemptNumber, null);
+  }
+  
+  public InputAttemptIdentifier(InputIdentifier inputIdentifier, int attemptNumber, String pathComponent) {
+    this.inputIdentifier = inputIdentifier;
+    this.attemptNumber = attemptNumber;
+    this.pathComponent = pathComponent;
+  }
+  
+  public InputAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+    this(new InputIdentifier(taskIndex), attemptNumber, pathComponent);
+  }
+
+  public InputIdentifier getInputIdentifier() {
+    return this.inputIdentifier;
+  }
+
+  public int getAttemptNumber() {
+    return attemptNumber;
+  }
+  
+  public String getPathComponent() {
+    return pathComponent;
+  }
+
+  // PathComponent does not need to be part of the hashCode and equals computation.
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + attemptNumber;
+    result = prime * result
+        + ((inputIdentifier == null) ? 0 : inputIdentifier.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InputAttemptIdentifier other = (InputAttemptIdentifier) obj;
+    if (attemptNumber != other.attemptNumber)
+      return false;
+    if (inputIdentifier == null) {
+      if (other.inputIdentifier != null)
+        return false;
+    } else if (!inputIdentifier.equals(other.inputIdentifier))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "InputAttemptIdentifier [inputIdentifier=" + inputIdentifier
+        + ", attemptNumber=" + attemptNumber + ", pathComponent="
+        + pathComponent + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
new file mode 100644
index 0000000..b694530
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/InputIdentifier.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common;
+
+public class InputIdentifier {
+
+  private final int srcTaskIndex;
+  
+  public InputIdentifier(int srcTaskIndex) {
+    this.srcTaskIndex = srcTaskIndex;
+  }
+
+  public int getSrcTaskIndex() {
+    return this.srcTaskIndex;
+  }
+
+  @Override
+  public int hashCode() {
+    return srcTaskIndex;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    InputIdentifier other = (InputIdentifier) obj;
+    if (srcTaskIndex != other.srcTaskIndex)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "InputIdentifier [srcTaskIndex=" + srcTaskIndex + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index da333a2..f352e08 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -25,8 +25,13 @@ import java.lang.reflect.InvocationTargetException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.newapi.TezOutputContext;
 
 public class TezEngineUtils {
 
@@ -90,4 +95,20 @@ public class TezEngineUtils {
     }
     return partitioner;
   }
+  
+  public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+    Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        TezTaskOutputFiles.class);
+    try {
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+      ctor.setAccessible(true);
+      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+      return instance;
+    } catch (Exception e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate configured TezOutputFileManager: "
+              + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+                  TezTaskOutputFiles.class.getName()), e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 0440236..5c71644 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
 import org.apache.tez.engine.common.sort.impl.IFileInputStream;
@@ -217,7 +218,7 @@ class Fetcher extends Thread {
   @VisibleForTesting
   protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
-    List<TaskAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
+    List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
     
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
@@ -231,7 +232,7 @@ class Fetcher extends Thread {
     }
     
     // List of maps to be fetched yet
-    Set<TaskAttemptIdentifier> remaining = new HashSet<TaskAttemptIdentifier>(srcAttempts);
+    Set<InputAttemptIdentifier> remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
     DataInputStream input;
@@ -290,19 +291,20 @@ class Fetcher extends Thread {
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
       if (!connectSucceeded) {
-        for(TaskAttemptIdentifier left: remaining) {
+        for(InputAttemptIdentifier left: remaining) {
           scheduler.copyFailed(left, host, connectSucceeded);
         }
       } else {
         // If we got a read error at this stage, it implies there was a problem
         // with the first map, typically lost map. So, penalize only that map
         // and add the rest
-        TaskAttemptIdentifier firstMap = srcAttempts.get(0);
+        InputAttemptIdentifier firstMap = srcAttempts.get(0);
         scheduler.copyFailed(firstMap, host, connectSucceeded);
       }
       
       // Add back all the remaining maps, WITHOUT marking them as failed
-      for(TaskAttemptIdentifier left: remaining) {
+      for(InputAttemptIdentifier left: remaining) {
+        // TODO Should the first one be skipped ?
         scheduler.putBackKnownMapOutput(host, left);
       }
       
@@ -314,14 +316,14 @@ class Fetcher extends Thread {
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 
       // yet_to_be_fetched list and marking the failed tasks.
-      TaskAttemptIdentifier[] failedTasks = null;
+      InputAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
         failedTasks = copyMapOutput(host, input, remaining);
       }
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(TaskAttemptIdentifier left: failedTasks) {
+        for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true);
         }
       }
@@ -334,19 +336,19 @@ class Fetcher extends Thread {
             + remaining.size() + " left.");
       }
     } finally {
-      for (TaskAttemptIdentifier left : remaining) {
+      for (InputAttemptIdentifier left : remaining) {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
   }
   
-  private static TaskAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptIdentifier[0];
+  private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
   
-  private TaskAttemptIdentifier[] copyMapOutput(MapHost host,
+  private InputAttemptIdentifier[] copyMapOutput(MapHost host,
                                 DataInputStream input,
-                                Set<TaskAttemptIdentifier> remaining) {
+                                Set<InputAttemptIdentifier> remaining) {
     MapOutput mapOutput = null;
-    TaskAttemptIdentifier srcAttemptId = null;
+    InputAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
     long compressedLength = -1;
     
@@ -366,14 +368,14 @@ class Fetcher extends Thread {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
         //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
+        return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, srcAttemptId)) {
-        return new TaskAttemptIdentifier[] {srcAttemptId};
+        return new InputAttemptIdentifier[] {srcAttemptId};
       }
       
       if(LOG.isDebugEnabled()) {
@@ -418,9 +420,9 @@ class Fetcher extends Thread {
                  srcAttemptId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
         if(srcAttemptId == null) {
-          return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
+          return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
         } else {
-          return new TaskAttemptIdentifier[] {srcAttemptId};
+          return new InputAttemptIdentifier[] {srcAttemptId};
         }
       }
       
@@ -430,7 +432,7 @@ class Fetcher extends Thread {
       // Inform the shuffle-scheduler
       mapOutput.abort();
       metrics.failedFetch();
-      return new TaskAttemptIdentifier[] {srcAttemptId};
+      return new InputAttemptIdentifier[] {srcAttemptId};
     }
 
   }
@@ -445,7 +447,7 @@ class Fetcher extends Thread {
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, Set<TaskAttemptIdentifier> remaining, TaskAttemptIdentifier srcAttemptId) {
+      int forReduce, Set<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
     if (compressedLength < 0 || decompressedLength < 0) {
       wrongLengthErrs.increment(1);
       LOG.warn(getName() + " invalid lengths in map output header: id: " +
@@ -482,13 +484,13 @@ class Fetcher extends Thread {
    * @return
    * @throws MalformedURLException
    */
-  private URL getMapOutputURL(MapHost host, List<TaskAttemptIdentifier> srcAttempts
+  private URL getMapOutputURL(MapHost host, List<InputAttemptIdentifier> srcAttempts
                               )  throws MalformedURLException {
     // Get the base url
     StringBuffer url = new StringBuffer(host.getBaseUrl());
     
     boolean first = true;
-    for (TaskAttemptIdentifier mapId : srcAttempts) {
+    for (InputAttemptIdentifier mapId : srcAttempts) {
       if (!first) {
         url.append(",");
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
index d10ebaa..1beed44 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 
@@ -34,14 +35,14 @@ import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
-  private final TaskAttemptIdentifier taskAttemptId;
+  private final InputAttemptIdentifier taskAttemptId;
   private final MergeManager merger;
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
   private int prevKeyPos;
 
-  public InMemoryReader(MergeManager merger, TaskAttemptIdentifier taskAttemptId,
+  public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
     super(null, null, length - start, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
index cd644de..20ec472 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
@@ -20,6 +20,10 @@ package org.apache.tez.engine.common.shuffle.impl;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
+
+@Private
 class MapHost {
   
   public static enum State {
@@ -35,7 +39,7 @@ class MapHost {
   private final String baseUrl;
   private final String identifier;
   // Tracks attempt IDs
-  private List<TaskAttemptIdentifier> maps = new ArrayList<TaskAttemptIdentifier>();
+  private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
   
   public MapHost(int partitionId, String hostName, String baseUrl) {
     this.partitionId = partitionId;
@@ -68,16 +72,16 @@ class MapHost {
     return baseUrl;
   }
 
-  public synchronized void addKnownMap(TaskAttemptIdentifier srcAttempt) {
+  public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
     maps.add(srcAttempt);
     if (state == State.IDLE) {
       state = State.PENDING;
     }
   }
 
-  public synchronized List<TaskAttemptIdentifier> getAndClearKnownMaps() {
-    List<TaskAttemptIdentifier> currentKnownMaps = maps;
-    maps = new ArrayList<TaskAttemptIdentifier>();
+  public synchronized List<InputAttemptIdentifier> getAndClearKnownMaps() {
+    List<InputAttemptIdentifier> currentKnownMaps = maps;
+    maps = new ArrayList<InputAttemptIdentifier>();
     return currentKnownMaps;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index f0b48fd..46851c7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
 
 
@@ -42,7 +43,7 @@ class MapOutput {
     DISK
   }
   
-  private TaskAttemptIdentifier attemptIdentifier;
+  private InputAttemptIdentifier attemptIdentifier;
   private final int id;
   
   private final MergeManager merger;
@@ -61,7 +62,7 @@ class MapOutput {
   
   private final boolean primaryMapOutput;
   
-  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
+  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
             Configuration conf, LocalDirAllocator localDirAllocator,
             int fetcher, boolean primaryMapOutput, 
             TezTaskOutputFiles mapOutputFile)
@@ -79,7 +80,7 @@ class MapOutput {
     
     this.localFS = FileSystem.getLocal(conf);
     outputPath =
-      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getTaskIndex(), size);
+      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
     tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
 
     disk = localFS.create(tmpOutputPath);
@@ -87,7 +88,7 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
+  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
     this.attemptIdentifier = attemptIdentifier;
@@ -107,7 +108,7 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
 
-  public MapOutput(TaskAttemptIdentifier attemptIdentifier) {
+  public MapOutput(InputAttemptIdentifier attemptIdentifier) {
     this.id = ID.incrementAndGet();
     this.attemptIdentifier = attemptIdentifier;
 
@@ -159,7 +160,7 @@ class MapOutput {
     return disk;
   }
 
-  public TaskAttemptIdentifier getAttemptIdentifier() {
+  public InputAttemptIdentifier getAttemptIdentifier() {
     return this.attemptIdentifier;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index a5401fa..093a293 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -47,6 +47,7 @@ import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
@@ -232,7 +233,7 @@ public class MergeManager {
 
   final private MapOutput stallShuffle = new MapOutput(null);
 
-  public synchronized MapOutput reserve(TaskAttemptIdentifier srcAttemptIdentifier, 
+  public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
                                              long requestedSize,
                                              int fetcher
                                              ) throws IOException {
@@ -279,7 +280,7 @@ public class MergeManager {
    * @return
    */
   private synchronized MapOutput unconditionalReserve(
-      TaskAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
+      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
     usedMemory += requestedSize;
     return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, 
         primaryMapOutput);
@@ -409,7 +410,7 @@ public class MergeManager {
         return;
       }
 
-      TaskAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
+      InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
         createInMemorySegments(inputs, inMemorySegments, 0);
@@ -468,17 +469,16 @@ public class MergeManager {
       //in the merge method)
 
       //figure out the mapId 
-      TaskAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+      InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
         createInMemorySegments(inputs, inMemorySegments,0);
       int noInMemorySegments = inMemorySegments.size();
 
-      Path outputPath = 
-        mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getTaskIndex(),
-                                           mergeOutputSize).suffix(
-                                               Constants.MERGED_OUTPUT_PREFIX);
+      Path outputPath = mapOutputFile.getInputFileForWrite(
+          srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
+          mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
 
       Writer writer = null;
       try {
@@ -696,7 +696,7 @@ public class MergeManager {
     long inMemToDiskBytes = 0;
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
-      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getTaskIndex();
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                 memDiskSegments,
                                                 maxInMemReduce);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 7c41d3d..620c620 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -181,6 +181,7 @@ public class Shuffle implements ExceptionReporter {
   private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
     @Override
     public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+      // TODO NEWTEZ Limit # fetchers to number of inputs
       final int numFetchers = 
           conf.getInt(
               TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
index ec5db5b..a918ef1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
@@ -62,6 +62,14 @@ public class ShuffleHeader implements Writable {
     this.forReduce = forReduce;
   }
   
+  public String getMapId() {
+    return this.mapId;
+  }
+  
+  public int getPartition() {
+    return this.forReduce;
+  }
+  
   public long getUncompressedLength() {
     return uncompressedLength;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
index b31d36c..98c7ab1 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
 import org.apache.tez.engine.newapi.Event;
@@ -95,7 +96,7 @@ public class ShuffleInputEventHandler {
     int partitionId = dmEvent.getSourceIndex();
     URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
 
-    TaskAttemptIdentifier srcAttemptIdentifier = new TaskAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
     scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
     
     // TODO NEWTEZ See if this duration hack can be removed.
@@ -107,7 +108,7 @@ public class ShuffleInputEventHandler {
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
-    TaskAttemptIdentifier taIdentifier = new TaskAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
     scheduler.obsoleteMapOutput(taIdentifier);
     LOG.info("Obsoleting output of src-task: " + taIdentifier);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bd76ffcf/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
index 964533d..acbd054 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 import org.apache.tez.engine.common.TezEngineUtils;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
@@ -63,17 +64,17 @@ class ShuffleScheduler {
   private final Map<Integer, MutableInt> finishedMaps;
   private final int numInputs;
   private int remainingMaps;
-  private Map<TaskAttemptIdentifier, MapHost> mapLocations = new HashMap<TaskAttemptIdentifier, MapHost>();
+  private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
   //TODO NEWTEZ Clean this and other maps at some point
-  private ConcurrentMap<String, TaskAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, TaskAttemptIdentifier>(); 
+  private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<TaskAttemptIdentifier> obsoleteMaps = new HashSet<TaskAttemptIdentifier>();
+  private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
   
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
   private final Referee referee = new Referee();
-  private final Map<TaskAttemptIdentifier, IntWritable> failureCounts =
-    new HashMap<TaskAttemptIdentifier,IntWritable>(); 
+  private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
+    new HashMap<InputAttemptIdentifier,IntWritable>(); 
   private final Map<String,IntWritable> hostFailures = 
     new HashMap<String,IntWritable>();
   private final TezInputContext inputContext;
@@ -127,19 +128,19 @@ class ShuffleScheduler {
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
   }
 
-  public synchronized void copySucceeded(TaskAttemptIdentifier srcAttemptIdentifier, 
+  public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
                                          MapHost host,
                                          long bytes,
                                          long milis,
                                          MapOutput output
                                          ) throws IOException {
-    String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+    String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
     failureCounts.remove(taskIdentifier);
     hostFailures.remove(host.getHostName());
     
-    if (!isFinishedTaskTrue(srcAttemptIdentifier.getTaskIndex())) {
+    if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
       output.commit();
-      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getTaskIndex())) {
+      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
         shuffledMapsCounter.increment(1);
         if (--remainingMaps == 0) {
           notifyAll();
@@ -154,10 +155,11 @@ class ShuffleScheduler {
       if (LOG.isDebugEnabled()) {
         LOG.debug("src task: "
             + TezEngineUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttemptIdentifier.getTaskIndex(),
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
                 srcAttemptIdentifier.getAttemptNumber()) + " done");
       }
     }
+    // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
   }
 
   private void logProgress() {
@@ -170,7 +172,7 @@ class ShuffleScheduler {
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
-  public synchronized void copyFailed(TaskAttemptIdentifier srcAttempt,
+  public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
                                       MapHost host,
                                       boolean readError) {
     host.penalize();
@@ -194,7 +196,7 @@ class ShuffleScheduler {
         throw new IOException(failures
             + " failures downloading "
             + TezEngineUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
                 srcAttempt.getAttemptNumber()));
       } catch (IOException ie) {
         shuffle.reportException(ie);
@@ -217,20 +219,20 @@ class ShuffleScheduler {
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
-      int failures, TaskAttemptIdentifier srcAttempt, boolean readError) {
+      int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
     if ((reportReadErrorImmediately && readError)
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
       LOG.info("Reporting fetch failure for "
           + TezEngineUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
               srcAttempt.getAttemptNumber()) + " to jobtracker.");
 
       List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
       failedEvents.add(new InputReadErrorEvent("Fetch failure for "
           + TezEngineUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
-              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt
-          .getTaskIndex(), srcAttempt.getAttemptNumber()));
+              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
+              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
+          .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
 
       inputContext.sendEvents(failedEvents);      
       //status.addFailedDependency(mapId);
@@ -299,7 +301,7 @@ class ShuffleScheduler {
   public synchronized void addKnownMapOutput(String hostName,
                                              int partitionId,
                                              String hostUrl,
-                                             TaskAttemptIdentifier srcAttempt) {
+                                             InputAttemptIdentifier srcAttempt) {
     String identifier = MapHost.createIdentifier(hostName, partitionId);
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
@@ -317,13 +319,13 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void obsoleteMapOutput(TaskAttemptIdentifier srcAttempt) {
+  public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
     // The incoming srcAttempt does not contain a path component.
     obsoleteMaps.add(srcAttempt);
   }
   
   public synchronized void putBackKnownMapOutput(MapHost host,
-                                                 TaskAttemptIdentifier srcAttempt) {
+                                                 InputAttemptIdentifier srcAttempt) {
     host.addKnownMap(srcAttempt);
   }
 
@@ -349,20 +351,20 @@ class ShuffleScheduler {
       return host;
   }
   
-  public TaskAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
+  public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
     return pathToIdentifierMap.get(pathComponent);
   }
   
-  public synchronized List<TaskAttemptIdentifier> getMapsForHost(MapHost host) {
-    List<TaskAttemptIdentifier> list = host.getAndClearKnownMaps();
-    Iterator<TaskAttemptIdentifier> itr = list.iterator();
-    List<TaskAttemptIdentifier> result = new ArrayList<TaskAttemptIdentifier>();
+  public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
+    List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
+    Iterator<InputAttemptIdentifier> itr = list.iterator();
+    List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
     int includedMaps = 0;
     int totalSize = list.size();
     // find the maps that we still need, up to the limit
     while (itr.hasNext()) {
-      TaskAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
+      InputAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
@@ -371,8 +373,8 @@ class ShuffleScheduler {
     }
     // put back the maps left after the limit
     while (itr.hasNext()) {
-      TaskAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
+      InputAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
         host.addKnownMap(id);
       }
     }