You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:11 UTC

[02/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
new file mode 100644
index 0000000..b7b1e82
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/SortBufferInputStream.java
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
+
+  public class SortBufferInputStream extends InputStream {
+
+  private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
+  
+  private final InMemoryShuffleSorter sorter;
+  private InMemoryWriter sortOutput;
+  
+  private int mend;
+  private int recIndex;
+  private final byte[] kvbuffer;       
+  private final IntBuffer kvmeta;
+  private final int partitionBytes;
+  private final int partition;
+  
+  byte[] dualBuf = new byte[8192];
+  DualBufferOutputStream out;
+  private int readBytes = 0;
+  
+  public SortBufferInputStream(
+      InMemoryShuffleSorter sorter, int partition) {
+    this.sorter = sorter;
+    this.partitionBytes = 
+        (int)sorter.getShuffleHeader(partition).getCompressedLength();
+    this.partition = partition;
+    this.mend = sorter.getMetaEnd();
+    this.recIndex = sorter.getSpillIndex(partition);
+    this.kvbuffer = sorter.kvbuffer;
+    this.kvmeta = sorter.kvmeta;
+    out = new DualBufferOutputStream(null, 0, 0, dualBuf);
+    sortOutput = new InMemoryWriter(out);
+  }
+  
+  byte[] one = new byte[1];
+  
+  @Override
+  public int read() throws IOException {
+    int b = read(one, 0, 1);
+    return (b == -1) ? b : one[0]; 
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (available() == 0) {
+      return -1;
+    }
+    
+    int currentOffset = off;
+    int currentLength = len;
+    int currentReadBytes = 0;
+    
+    // Check if there is residual data in the dualBuf
+    int residualLen = out.getCurrent();
+    if (residualLen > 0) {
+      int readable = Math.min(currentLength, residualLen);
+      System.arraycopy(dualBuf, 0, b, currentOffset, readable);
+      currentOffset += readable;
+      currentReadBytes += readable;
+      out.setCurrentPointer(-readable);
+      
+      // buffer has less capacity
+      currentLength -= readable;
+      
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX read_residual:" +
+            " readable=" + readable +
+            " readBytes=" + readBytes);
+      }
+    }
+    
+    // Now, use the provided buffer
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX read: out.reset" +
+          " b=" + b + 
+          " currentOffset=" + currentOffset + 
+          " currentLength=" + currentLength +
+          " recIndex=" + recIndex);
+    }
+    out.reset(b, currentOffset, currentLength);
+    
+    // Read from sort-buffer into the provided buffer, space permitting
+    DataInputBuffer key = new DataInputBuffer();
+    final InMemValBytes value = sorter.createInMemValBytes();
+    
+    int kvPartition = 0;
+    int numRec = 0;
+    for (;
+         currentLength > 0 && recIndex < mend && 
+             (kvPartition = getKVPartition(recIndex)) == partition;
+        ++recIndex) {
+      
+      final int kvoff = sorter.offsetFor(recIndex);
+      
+      int keyLen = 
+          (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) - 
+              kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
+      key.reset(
+          kvbuffer, 
+          kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
+          keyLen
+          );
+      
+      int valLen = sorter.getVBytesForOffset(kvoff, value);
+
+      int recLen = 
+          (keyLen + WritableUtils.getVIntSize(keyLen)) + 
+          (valLen + WritableUtils.getVIntSize(valLen));
+      
+      currentReadBytes += recLen;
+      currentOffset += recLen;
+      currentLength -= recLen;
+
+      // Write out key/value into the in-mem ifile
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX read: sortOutput.append" +
+            " #rec=" + ++numRec +
+            " recIndex=" + recIndex + " kvoff=" + kvoff + 
+            " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
+            " readBytes=" + readBytes +
+            " currentReadBytes="  + currentReadBytes +
+            " currentLength=" + currentLength);
+      }
+      sortOutput.append(key, value);
+    }
+
+    // If we are at the end of the segment, close the ifile
+    if (currentLength > 0 && 
+        (recIndex == mend || kvPartition != partition)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX About to call close:" +
+            " currentLength=" + currentLength + 
+            " recIndex=" + recIndex + " mend=" + mend + 
+            " kvPartition=" + kvPartition + " partitino=" + partition);
+      }
+      sortOutput.close();
+      currentReadBytes += 
+          (InMemoryShuffleSorter.IFILE_EOF_LENGTH + 
+              InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("XXX Hmm..." +
+            " currentLength=" + currentLength + 
+            " recIndex=" + recIndex + " mend=" + mend + 
+            " kvPartition=" + kvPartition + " partitino=" + partition);
+      }
+    }
+    
+    int retVal = Math.min(currentReadBytes, len);
+    readBytes += retVal;
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("XXX read: done" +
+          " retVal=" + retVal + 
+          " currentReadBytes=" + currentReadBytes +
+          " len=" + len + 
+          " readBytes=" + readBytes +
+          " partitionBytes=" + partitionBytes +
+          " residualBytes=" + out.getCurrent());
+    }
+    return retVal;
+  }
+
+  private int getKVPartition(int recIndex) {
+    return kvmeta.get(
+        sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
+  }
+  
+  @Override
+  public int available() throws IOException {
+    return (partitionBytes - readBytes);
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+  
+  static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
+
+    byte[] dualBuf;
+    int currentPointer = 0;
+    byte[] one = new byte[1];
+    
+    public DualBufferOutputStream(
+        byte[] buf, int offset, int length, 
+        byte[] altBuf) {
+      super(buf, offset, length);
+      this.dualBuf = altBuf;
+    }
+    
+    public void reset(byte[] b, int off, int len) {
+      super.resetBuffer(b, off, len);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      one[0] = (byte)b;
+      write(one, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      int available = super.available();
+      if (available >= len) {
+        super.write(b, off, len);
+      } else {
+        super.write(b, off, available);
+        System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
+        currentPointer += (len - available);
+      }
+    }
+    
+    int getCurrent() {
+      return currentPointer;
+    }
+    
+    void setCurrentPointer(int delta) {
+      if ((currentPointer + delta) > dualBuf.length) {
+        throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
+        		" marker to " + (currentPointer+delta) + " when " +
+        		" dualBuf.length is " + dualBuf.length);
+      }
+      currentPointer = (currentPointer + delta) % dualBuf.length;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
new file mode 100644
index 0000000..88cb750
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/impl/ValuesIterator.java
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.task.impl;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+
+/**
+ * Iterates values while keys match in sorted input.
+ *
+ * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
+ * followed by getKey() and getValues() to get the current key and list of values.
+ * 
+ */
+public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+  protected TezRawKeyValueIterator in; //input iterator
+  private KEY key;               // current key
+  private KEY nextKey;
+  private VALUE value;             // current value
+  private boolean hasNext;                      // more w/ this key
+  private boolean more;                         // more in file
+  private RawComparator<KEY> comparator;
+  protected Progressable reporter;
+  private Deserializer<KEY> keyDeserializer;
+  private Deserializer<VALUE> valDeserializer;
+  private DataInputBuffer keyIn = new DataInputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+  
+  public ValuesIterator (TezRawKeyValueIterator in, 
+                         RawComparator<KEY> comparator, 
+                         Class<KEY> keyClass,
+                         Class<VALUE> valClass, Configuration conf, 
+                         Progressable reporter)
+    throws IOException {
+    this.in = in;
+    this.comparator = comparator;
+    this.reporter = reporter;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(this.valueIn);
+    readNextKey();
+    key = nextKey;
+    nextKey = null; // force new instance creation
+    hasNext = more;
+  }
+
+  TezRawKeyValueIterator getRawIterator() { return in; }
+  
+  /// Iterator methods
+
+  public boolean hasNext() { return hasNext; }
+
+  private int ctr = 0;
+  public VALUE next() {
+    if (!hasNext) {
+      throw new NoSuchElementException("iterate past last value");
+    }
+    try {
+      readNextValue();
+      readNextKey();
+    } catch (IOException ie) {
+      throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+    }
+    reporter.progress();
+    return value;
+  }
+
+  public void remove() { throw new RuntimeException("not implemented"); }
+
+  /// Auxiliary methods
+
+  /** Start processing next unique key. */
+  public void nextKey() throws IOException {
+    // read until we find a new key
+    while (hasNext) { 
+      readNextKey();
+    }
+    ++ctr;
+    
+    // move the next key to the current one
+    KEY tmpKey = key;
+    key = nextKey;
+    nextKey = tmpKey;
+    hasNext = more;
+  }
+
+  /** True iff more keys remain. */
+  public boolean more() { 
+    return more; 
+  }
+
+  /** The current key. */
+  public KEY getKey() { 
+    return key; 
+  }
+
+  /** 
+   * read the next key 
+   */
+  private void readNextKey() throws IOException {
+    more = in.next();
+    if (more) {
+      DataInputBuffer nextKeyBytes = in.getKey();
+      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+      nextKey = keyDeserializer.deserialize(nextKey);
+      hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+    } else {
+      hasNext = false;
+    }
+  }
+
+  /**
+   * Read the next value
+   * @throws IOException
+   */
+  private void readNextValue() throws IOException {
+    DataInputBuffer nextValueBytes = in.getValue();
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    value = valDeserializer.deserialize(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
new file mode 100644
index 0000000..30d28f0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+  public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
+
+  private LocalDirAllocator lDirAlloc =
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING, conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING, size, conf);
+  }
+  
+  /**
+   * Create a local map output file name. This should *only* be used if the size
+   * of the file is not known. Otherwise use the equivalent which accepts a size
+   * parameter.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFileForWrite() throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR
+        + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING,
+        conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  @Override
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  @Override
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(),
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFile(InputAttemptIdentifier mapId)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(String.format(
+        Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, 
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
+  }
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFileForWrite(int taskId,
+                                   long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.TEZ_RUNTIME_TASK_INPUT_FILE_FORMAT_STRING, Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, taskId),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  @Override
+  public void removeAll()
+      throws IOException {
+    deleteLocalFiles(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+  }
+
+  private String[] getLocalDirs() throws IOException {
+    return conf.getStrings(TezJobConfig.LOCAL_DIRS);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
new file mode 100644
index 0000000..d3e7d27
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput {
+
+  protected Configuration conf;
+  protected String uniqueId;
+
+  public TezTaskOutput(Configuration conf, String uniqueId) {
+    this.conf = conf;
+    this.uniqueId = uniqueId;
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFile() throws IOException;
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local output file name. This method is meant to be used *only* if
+   * the size of the file is not know up front.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFileForWrite() throws IOException;
+  
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFile() throws IOException;
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param attemptIdentifier The identifier for the source task
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param taskIdentifier The identifier for the source task
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFileForWrite(
+      int taskIdentifier, long size) throws IOException;
+
+  /** Removes all of the files related to a task. */
+  public abstract void removeAll() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
new file mode 100644
index 0000000..2c18b4e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.task.local.output;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+  
+  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
+
+  private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
+
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc =
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+  
+
+  private Path getAttemptOutputDir() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAttemptOutputDir: "
+          + Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/"
+          + uniqueId);
+    }
+    return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name. This should *only* be used if the size
+   * of the file is not known. Otherwise use the equivalent which accepts a size
+   * parameter.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
+    return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
+    return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            uniqueId, spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            uniqueId, spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            uniqueId, spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            uniqueId, spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(int srcTaskId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
new file mode 100644
index 0000000..eb8d176
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/hadoop/compat/NullProgressable.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.hadoop.compat;
+
+import org.apache.hadoop.util.Progressable;
+
+public class NullProgressable implements Progressable {
+
+  public NullProgressable() {
+    // TODO Auto-generated constructor stub
+  }
+
+  @Override
+  public void progress() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
new file mode 100644
index 0000000..3aec247
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle;
+
+/**
+ * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class LocalMergedInput extends ShuffledMergedInputLegacy {
+
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+
+    LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
+    rawIter = localShuffle.run();
+    createValuesIterator();
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    rawIter.close();
+    return Collections.emptyList();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
new file mode 100644
index 0000000..771ac1b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ *
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ *
+ */
+public class ShuffledMergedInput implements LogicalInput {
+
+  static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
+
+  protected TezInputContext inputContext;
+  protected TezRawKeyValueIterator rawIter = null;
+  protected Configuration conf;
+  protected int numInputs = 0;
+  protected Shuffle shuffle;
+  @SuppressWarnings("rawtypes")
+  protected ValuesIterator vIter;
+
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+
+    this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
+        inputContext.getWorkDirs());
+
+    // Start the shuffle - copy and merge.
+    shuffle = new Shuffle(inputContext, this.conf, numInputs);
+    shuffle.run();
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Check if the input is ready for consumption
+   *
+   * @return true if the input is ready for consumption, or if an error occurred
+   *         processing fetching the input. false if the shuffle and merge are
+   *         still in progress
+   */
+  public boolean isInputReady() {
+    return shuffle.isInputReady();
+  }
+
+  /**
+   * Waits for the input to become ready for consumption
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void waitForInputReady() throws IOException, InterruptedException {
+    rawIter = shuffle.waitForInput();
+    createValuesIterator();
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    rawIter.close();
+    return Collections.emptyList();
+  }
+
+  /**
+   * Get a KVReader for the Input.</p> This method will block until the input is
+   * ready - i.e. the copy and merge stages are complete. Users can use the
+   * isInputReady method to check if the input is ready, which gives an
+   * indication of whether this method will block or not.
+   *
+   * NOTE: All values for the current K-V pair must be read prior to invoking
+   * moveToNext. Once moveToNext() is called, the valueIterator from the
+   * previous K-V pair will throw an Exception
+   *
+   * @return a KVReader over the sorted input.
+   */
+  @Override
+  public KVReader getReader() throws IOException {
+    if (rawIter == null) {
+      try {
+        waitForInputReady();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while waiting for input ready", e);
+      }
+    }
+    return new KVReader() {
+
+      @Override
+      public boolean next() throws IOException {
+        return vIter.moveToNext();
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public KVRecord getCurrentKV() {
+        return new KVRecord(vIter.getKey(), vIter.getValues());
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    shuffle.handleEvents(inputEvents);
+  }
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  protected void createValuesIterator()
+      throws IOException {
+    vIter = new ValuesIterator(rawIter,
+        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
+        ConfigUtils.getIntermediateInputKeyClass(conf),
+        ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+
+  }
+
+  // This functionality is currently broken. If there's inputs which need to be
+  // written to disk, there's a possibility that inputs from the different
+  // sources could clobber each others' output. Also the current structures do
+  // not have adequate information to de-dupe these (vertex name)
+//  public void mergeWith(ShuffledMergedInput other) {
+//    this.numInputs += other.getNumPhysicalInputs();
+//  }
+//
+//  public int getNumPhysicalInputs() {
+//    return this.numInputs;
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
new file mode 100644
index 0000000..97e19d8
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java
@@ -0,0 +1,30 @@
+/**
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ * 
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ * 
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+
+@LimitedPrivate("mapreduce")
+public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
+
+  @Private
+  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
+    // wait for input so that iterator is available
+    waitForInputReady();
+    return rawIter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
new file mode 100644
index 0000000..42b2e00
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -0,0 +1,76 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.input;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
+
+import com.google.common.base.Preconditions;
+
+public class ShuffledUnorderedKVInput implements LogicalInput {
+
+  private Configuration conf;
+  private int numInputs = -1;
+  private BroadcastShuffleManager shuffleManager;
+  
+  
+  
+  public ShuffledUnorderedKVInput() {
+  }
+
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws Exception {
+    Preconditions.checkArgument(numInputs != -1, "Number of Inputs has not been set");
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, inputContext.getWorkDirs());
+    
+    this.shuffleManager = new BroadcastShuffleManager(inputContext, conf, numInputs);
+    return null;
+  }
+
+  @Override
+  public Reader getReader() throws Exception {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    shuffleManager.handleEvents(inputEvents);
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    this.shuffleManager.shutdown();
+    return null;
+  }
+
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
new file mode 100644
index 0000000..2ec6b2a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;
+
+/**
+ * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class InMemorySortedOutput implements LogicalOutput {
+  
+  protected InMemoryShuffleSorter sorter;
+  protected int numTasks;
+  protected TezOutputContext outputContext;
+  
+
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    this.outputContext = outputContext;
+    this.sorter = new InMemoryShuffleSorter();
+    sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public Writer getWriter() throws IOException {
+    return new KVWriter() {
+      
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // No events expected.
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numTasks = numOutputs;
+  }
+  
+  @Override
+  public List<Event> close() throws IOException {
+    sorter.flush();
+    sorter.close();
+    // TODO NEWTEZ Event generation
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
new file mode 100644
index 0000000..a19d5e1
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+public class LocalOnFileSorterOutput extends OnFileSortedOutput {
+
+  private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
+
+  
+
+  @Override
+  public List<Event> close() throws IOException {
+    LOG.debug("Closing LocalOnFileSorterOutput");
+    super.close();
+
+    TezTaskOutput mapOutputFile = sorter.getMapOutput();
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    Path src = mapOutputFile.getOutputFile();
+    Path dst =
+        mapOutputFile.getInputFileForWrite(
+            outputContext.getTaskIndex(),
+            localFs.getFileStatus(src).getLen());
+
+    LOG.info("Renaming src = " + src + ", dst = " + dst);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming src = " + src + ", dst = " + dst);
+    }
+    localFs.rename(src, dst);
+    return null;
+  }
+  
+  @Override
+  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
new file mode 100644
index 0000000..42e1eeb
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.output;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.collect.Lists;
+
+/**
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class OnFileSortedOutput implements LogicalOutput {
+  
+  protected ExternalSorter sorter;
+  protected Configuration conf;
+  protected int numOutputs;
+  protected TezOutputContext outputContext;
+  private long startTime;
+  private long endTime;
+  
+  
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    this.startTime = System.nanoTime();
+    this.outputContext = outputContext;
+    sorter = new DefaultSorter();
+    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+    // Initializing this parametr in this conf since it is used in multiple
+    // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
+    // TezMerger, etc.
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    sorter.initialize(outputContext, conf, numOutputs);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public KVWriter getWriter() throws IOException {
+    return new KVWriter() {
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events.
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
+  }
+
+  @Override
+  public List<Event> close() throws IOException {
+    sorter.flush();
+    sorter.close();
+    this.endTime = System.nanoTime();
+
+   return generateDataMovementEventsOnClose();
+  }
+  
+  protected List<Event> generateDataMovementEventsOnClose() throws IOException {
+    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+        .toString());
+    ByteBuffer shuffleMetadata = outputContext
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+    payloadBuilder.setHost(host);
+    payloadBuilder.setPort(shufflePort);
+    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+    payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+    byte[] payloadBytes = payloadProto.toByteArray();
+
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+
+    for (int i = 0; i < numOutputs; i++) {
+      DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
+      events.add(event);
+    }
+    return events;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
new file mode 100644
index 0000000..dd18149
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -0,0 +1,98 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.output;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class OnFileUnorderedKVOutput implements LogicalOutput {
+
+  private TezOutputContext outputContext;
+  private FileBasedKVWriter kvWriter;
+
+  public OnFileUnorderedKVOutput() {
+  }
+
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws Exception {
+    this.outputContext = outputContext;
+    this.kvWriter = new FileBasedKVWriter(outputContext);
+    return Collections.emptyList();
+  }
+
+  @Override
+  public KVWriter getWriter() throws Exception {
+    return kvWriter;
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    throw new TezUncheckedException("Not expecting any events");
+  }
+
+  @Override
+  public List<Event> close() throws Exception {
+    boolean outputGenerated = this.kvWriter.close();
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+
+    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+        .toString());
+    ByteBuffer shuffleMetadata = outputContext
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    int shufflePort = ShuffleUtils
+        .deserializeShuffleProviderMetaData(shuffleMetadata);
+    payloadBuilder.setOutputGenerated(outputGenerated);
+    if (outputGenerated) {
+      payloadBuilder.setHost(host);
+      payloadBuilder.setPort(shufflePort);
+      payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+    }
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+
+    DataMovementEvent dmEvent = new DataMovementEvent(0,
+        payloadProto.toByteArray());
+    List<Event> events = Lists.newArrayListWithCapacity(1);
+    events.add(dmEvent);
+    return events;
+  }
+
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    Preconditions.checkArgument(numOutputs == 1,
+        "Number of outputs can only be 1 for " + this.getClass().getName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
new file mode 100644
index 0000000..a98ce63
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/DiskFetchedInput.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+
+import com.google.common.base.Preconditions;
+
+public class DiskFetchedInput extends FetchedInput {
+
+  private static final Log LOG = LogFactory.getLog(DiskFetchedInput.class);
+  
+  private final FileSystem localFS;
+  private final Path tmpOutputPath;
+  private final Path outputPath;
+
+  public DiskFetchedInput(long size,
+      InputAttemptIdentifier inputAttemptIdentifier,
+      FetchedInputCallback callbackHandler, Configuration conf,
+      LocalDirAllocator localDirAllocator, TezTaskOutputFiles filenameAllocator)
+      throws IOException {
+    super(Type.DISK, size, inputAttemptIdentifier, callbackHandler);
+
+    this.localFS = FileSystem.getLocal(conf);
+    this.outputPath = filenameAllocator.getInputFileForWrite(
+        this.inputAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
+    this.tmpOutputPath = outputPath.suffix(String.valueOf(id));
+  }
+
+  @Override
+  public OutputStream getOutputStream() throws IOException {
+    return localFS.create(tmpOutputPath);
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return localFS.open(outputPath);
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (state == State.PENDING) {
+      state = State.COMMITTED;
+      localFS.rename(tmpOutputPath, outputPath);
+      notifyFetchComplete();
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    if (state == State.PENDING) {
+      state = State.ABORTED;
+      // TODO NEWTEZ Maybe defer this to container cleanup
+      localFS.delete(tmpOutputPath, false);
+      notifyFetchFailure();
+    }
+  }
+  
+  @Override
+  public void free() {
+    Preconditions.checkState(
+        state == State.COMMITTED || state == State.ABORTED,
+        "FetchedInput can only be freed after it is committed or aborted");
+    if (state == State.COMMITTED) {
+      state = State.FREED;
+      try {
+        // TODO NEWTEZ Maybe defer this to container cleanup
+        localFS.delete(outputPath, false);
+      } catch (IOException e) {
+        // Ignoring the exception, will eventually be cleaned by container
+        // cleanup.
+        LOG.warn("Failed to remvoe file : " + outputPath.toString());
+      }
+      notifyFreedResource();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "DiskFetchedInput [outputPath=" + outputPath
+        + ", inputAttemptIdentifier=" + inputAttemptIdentifier + ", size="
+        + size + ", type=" + type + ", id=" + id + ", state=" + state + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
new file mode 100644
index 0000000..df38b07
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchResult.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public class FetchResult {
+
+  private final String host;
+  private final int port;
+  private final int partition;
+  private final Iterable<InputAttemptIdentifier> pendingInputs;
+
+  public FetchResult(String host, int port, int partition,
+      Iterable<InputAttemptIdentifier> pendingInputs) {
+    this.host = host;
+    this.port = port;
+    this.partition = partition;
+    this.pendingInputs = pendingInputs;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+  public Iterable<InputAttemptIdentifier> getPendingInputs() {
+    return pendingInputs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
new file mode 100644
index 0000000..8f3c407
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInput.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+@Private
+public abstract class FetchedInput {
+  
+  public static enum Type {
+    WAIT, // TODO NEWTEZ Implement this, only if required.
+    MEMORY,
+    DISK,
+  }
+  
+  protected static enum State {
+    PENDING, COMMITTED, ABORTED, FREED
+  }
+
+  private static AtomicInteger ID_GEN = new AtomicInteger(0);
+
+  protected InputAttemptIdentifier inputAttemptIdentifier;
+  protected final long size;
+  protected final Type type;
+  protected final FetchedInputCallback callback;
+  protected final int id;
+  protected State state;
+
+  public FetchedInput(Type type, long size,
+      InputAttemptIdentifier inputAttemptIdentifier,
+      FetchedInputCallback callbackHandler) {
+    this.type = type;
+    this.size = size;
+    this.inputAttemptIdentifier = inputAttemptIdentifier;
+    this.callback = callbackHandler;
+    this.id = ID_GEN.getAndIncrement();
+    this.state = State.PENDING;
+  }
+
+  public Type getType() {
+    return this.type;
+  }
+
+  public long getSize() {
+    return this.size;
+  }
+
+  public InputAttemptIdentifier getInputAttemptIdentifier() {
+    return this.inputAttemptIdentifier;
+  }
+
+  /**
+   * Inform the Allocator about a committed resource.
+   * This should be called by commit
+   */
+  public void notifyFetchComplete() {
+    this.callback.fetchComplete(this);
+  }
+  
+  /**
+   * Inform the Allocator about a failed resource.
+   * This should be called by abort
+   */
+  public void notifyFetchFailure() {
+    this.callback.fetchFailed(this);
+  }
+  
+  /**
+   * Inform the Allocator about a completed resource being released.
+   * This should be called by free
+   */
+  public void notifyFreedResource() {
+    this.callback.freeResources(this);
+  }
+  
+  /**
+   * Returns the output stream to be used to write fetched data. Users are
+   * expected to close the OutputStream when they're done
+   */
+  public abstract OutputStream getOutputStream() throws IOException;
+
+  /**
+   * Return an input stream to be used to read the previously fetched data.
+   * Users are expected to close the InputStream when they're done
+   */
+  public abstract InputStream getInputStream() throws IOException;
+
+  /**
+   * Commit the output. Should be idempotent
+   */
+  public abstract void commit() throws IOException;
+
+  /**
+   * Abort the output. Should be idempotent
+   */
+  public abstract void abort() throws IOException;
+
+  /**
+   * Called when this input has been consumed, so that resources can be
+   * reclaimed.
+   */
+  public abstract void free();
+  
+  @Override
+  public int hashCode() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    FetchedInput other = (FetchedInput) obj;
+    if (id != other.id)
+      return false;
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
new file mode 100644
index 0000000..1d60b68
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputAllocator.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+
+public interface FetchedInputAllocator {
+
+  public FetchedInput allocate(long size,
+      InputAttemptIdentifier inputAttemptIdentifier) throws IOException;
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
new file mode 100644
index 0000000..a22ce45
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/FetchedInputCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.shuffle.common;
+
+public interface FetchedInputCallback {
+  
+  public void fetchComplete(FetchedInput fetchedInput);
+  
+  public void fetchFailed(FetchedInput fetchedInput);
+  
+  public void freeResources(FetchedInput fetchedInput);
+  
+}