You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:21 UTC

[12/20] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
deleted file mode 100644
index 6b48270..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ /dev/null
@@ -1,1108 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.util.IndexedSortable;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.ExternalSorter;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-
-@SuppressWarnings({"unchecked", "rawtypes"})
-public class DefaultSorter extends ExternalSorter implements IndexedSortable {
-  
-  private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
-
-  // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
-  
-  /**
-   * The size of each record in the index file for the map-outputs.
-   */
-  public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
-  private final static int APPROX_HEADER_LENGTH = 150;
-
-  // k/v accounting
-  IntBuffer kvmeta; // metadata overlay on backing store
-  int kvstart;            // marks origin of spill metadata
-  int kvend;              // marks end of spill metadata
-  int kvindex;            // marks end of fully serialized records
-
-  int equator;            // marks origin of meta/serialization
-  int bufstart;           // marks beginning of spill
-  int bufend;             // marks beginning of collectable
-  int bufmark;            // marks end of record
-  int bufindex;           // marks end of collected
-  int bufvoid;            // marks the point where we should stop
-                          // reading at the end of the buffer
-
-  byte[] kvbuffer;        // main output buffer
-  private final byte[] b0 = new byte[0];
-
-  protected static final int INDEX = 0;            // index offset in acct
-  protected static final int VALSTART = 1;         // val offset in acct
-  protected static final int KEYSTART = 2;         // key offset in acct
-  protected static final int PARTITION = 3;        // partition offset in acct
-  protected static final int NMETA = 4;            // num meta ints
-  protected static final int METASIZE = NMETA * 4; // size in bytes
-
-  // spill accounting
-  int maxRec;
-  int softLimit;
-  boolean spillInProgress;
-  int bufferRemaining;
-  volatile Throwable sortSpillException = null;
-
-  int numSpills = 0;
-  int minSpillsForCombine;
-  final ReentrantLock spillLock = new ReentrantLock();
-  final Condition spillDone = spillLock.newCondition();
-  final Condition spillReady = spillLock.newCondition();
-  final BlockingBuffer bb = new BlockingBuffer();
-  volatile boolean spillThreadRunning = false;
-  final SpillThread spillThread = new SpillThread();
-
-  final ArrayList<TezSpillRecord> indexCacheList =
-    new ArrayList<TezSpillRecord>();
-  private int totalIndexCacheMemory;
-  private int indexCacheMemoryLimit;
-
-  @Override
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { 
-    super.initialize(outputContext, conf, numOutputs);
-
-    // sanity checks
-    final float spillper = this.conf.getFloat(
-        TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
-    final int sortmb = this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
-        TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
-    if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
-      throw new IOException("Invalid \""
-          + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT + "\": " + spillper);
-    }
-    if ((sortmb & 0x7FF) != sortmb) {
-      throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_IO_SORT_MB
-          + "\": " + sortmb);
-    }
-
-    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
-                                       TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
-
-    // buffers and accounting
-    int maxMemUsage = sortmb << 20;
-    maxMemUsage -= maxMemUsage % METASIZE;
-    kvbuffer = new byte[maxMemUsage];
-    bufvoid = kvbuffer.length;
-    kvmeta = ByteBuffer.wrap(kvbuffer)
-       .order(ByteOrder.nativeOrder())
-       .asIntBuffer();
-    setEquator(0);
-    bufstart = bufend = bufindex = equator;
-    kvstart = kvend = kvindex;
-
-    maxRec = kvmeta.capacity() / NMETA;
-    softLimit = (int)(kvbuffer.length * spillper);
-    bufferRemaining = softLimit;
-    if (LOG.isInfoEnabled()) {
-      LOG.info(TezJobConfig.TEZ_ENGINE_IO_SORT_MB + ": " + sortmb);
-      LOG.info("soft limit at " + softLimit);
-      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
-    }
-
-    // k/v serialization
-    valSerializer.open(bb);
-    keySerializer.open(bb);
-
-    spillInProgress = false;
-    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
-    spillThread.setDaemon(true);
-    spillThread.setName("SpillThread");
-    spillLock.lock();
-    try {
-      spillThread.start();
-      while (!spillThreadRunning) {
-        spillDone.await();
-      }
-    } catch (InterruptedException e) {
-      throw new IOException("Spill thread failed to initialize", e);
-    } finally {
-      spillLock.unlock();
-    }
-    if (sortSpillException != null) {
-      throw new IOException("Spill thread failed to initialize",
-          sortSpillException);
-    }
-  }
-
-  @Override
-  public void write(Object key, Object value)
-      throws IOException {
-    collect(
-        key, value, partitioner.getPartition(key, value, partitions));
-  }
-
-  /**
-   * Serialize the key, value to intermediate storage.
-   * When this method returns, kvindex must refer to sufficient unused
-   * storage to store one METADATA.
-   */
-  synchronized void collect(Object key, Object value, final int partition
-                                   ) throws IOException {
-
-    if (key.getClass() != keyClass) {
-      throw new IOException("Type mismatch in key from map: expected "
-                            + keyClass.getName() + ", received "
-                            + key.getClass().getName());
-    }
-    if (value.getClass() != valClass) {
-      throw new IOException("Type mismatch in value from map: expected "
-                            + valClass.getName() + ", received "
-                            + value.getClass().getName());
-    }
-    if (partition < 0 || partition >= partitions) {
-      throw new IOException("Illegal partition for " + key + " (" +
-          partition + ")" + ", TotalPartitions: " + partitions);
-    }
-    checkSpillException();
-    bufferRemaining -= METASIZE;
-    if (bufferRemaining <= 0) {
-      // start spill if the thread is not running and the soft limit has been
-      // reached
-      spillLock.lock();
-      try {
-        do {
-          if (!spillInProgress) {
-            final int kvbidx = 4 * kvindex;
-            final int kvbend = 4 * kvend;
-            // serialized, unspilled bytes always lie between kvindex and
-            // bufindex, crossing the equator. Note that any void space
-            // created by a reset must be included in "used" bytes
-            final int bUsed = distanceTo(kvbidx, bufindex);
-            final boolean bufsoftlimit = bUsed >= softLimit;
-            if ((kvbend + METASIZE) % kvbuffer.length !=
-                equator - (equator % METASIZE)) {
-              // spill finished, reclaim space
-              resetSpill();
-              bufferRemaining = Math.min(
-                  distanceTo(bufindex, kvbidx) - 2 * METASIZE,
-                  softLimit - bUsed) - METASIZE;
-              continue;
-            } else if (bufsoftlimit && kvindex != kvend) {
-              // spill records, if any collected; check latter, as it may
-              // be possible for metadata alignment to hit spill pcnt
-              startSpill();
-              final int avgRec = (int)
-                (mapOutputByteCounter.getValue() /
-                mapOutputRecordCounter.getValue());
-              // leave at least half the split buffer for serialization data
-              // ensure that kvindex >= bufindex
-              final int distkvi = distanceTo(bufindex, kvbidx);
-              final int newPos = (bufindex +
-                Math.max(2 * METASIZE - 1,
-                        Math.min(distkvi / 2,
-                                 distkvi / (METASIZE + avgRec) * METASIZE)))
-                % kvbuffer.length;
-              setEquator(newPos);
-              bufmark = bufindex = newPos;
-              final int serBound = 4 * kvend;
-              // bytes remaining before the lock must be held and limits
-              // checked is the minimum of three arcs: the metadata space, the
-              // serialization space, and the soft limit
-              bufferRemaining = Math.min(
-                  // metadata max
-                  distanceTo(bufend, newPos),
-                  Math.min(
-                    // serialization max
-                    distanceTo(newPos, serBound),
-                    // soft limit
-                    softLimit)) - 2 * METASIZE;
-            }
-          }
-        } while (false);
-      } finally {
-        spillLock.unlock();
-      }
-    }
-
-    try {
-      // serialize key bytes into buffer
-      int keystart = bufindex;
-      keySerializer.serialize(key);
-      if (bufindex < keystart) {
-        // wrapped the key; must make contiguous
-        bb.shiftBufferedKey();
-        keystart = 0;
-      }
-      // serialize value bytes into buffer
-      final int valstart = bufindex;
-      valSerializer.serialize(value);
-      // It's possible for records to have zero length, i.e. the serializer
-      // will perform no writes. To ensure that the boundary conditions are
-      // checked and that the kvindex invariant is maintained, perform a
-      // zero-length write into the buffer. The logic monitoring this could be
-      // moved into collect, but this is cleaner and inexpensive. For now, it
-      // is acceptable.
-      bb.write(b0, 0, 0);
-
-      // the record must be marked after the preceding write, as the metadata
-      // for this record are not yet written
-      int valend = bb.markRecord();
-
-      mapOutputRecordCounter.increment(1);
-      mapOutputByteCounter.increment(
-          distanceTo(keystart, valend, bufvoid));
-
-      // write accounting info
-      kvmeta.put(kvindex + INDEX, kvindex);
-      kvmeta.put(kvindex + PARTITION, partition);
-      kvmeta.put(kvindex + KEYSTART, keystart);
-      kvmeta.put(kvindex + VALSTART, valstart);
-      // advance kvindex
-      kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
-    } catch (MapBufferTooSmallException e) {
-      LOG.info("Record too large for in-memory buffer: " + e.getMessage());
-      spillSingleRecord(key, value, partition);
-      mapOutputRecordCounter.increment(1);
-      return;
-    }
-  }
-
-  /**
-   * Set the point from which meta and serialization data expand. The meta
-   * indices are aligned with the buffer, so metadata never spans the ends of
-   * the circular buffer.
-   */
-  private void setEquator(int pos) {
-    equator = pos;
-    // set index prior to first entry, aligned at meta boundary
-    final int aligned = pos - (pos % METASIZE);
-    kvindex =
-      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
-    if (LOG.isInfoEnabled()) {
-      LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
-          "(" + (kvindex * 4) + ")");
-    }
-  }
-
-  /**
-   * The spill is complete, so set the buffer and meta indices to be equal to
-   * the new equator to free space for continuing collection. Note that when
-   * kvindex == kvend == kvstart, the buffer is empty.
-   */
-  private void resetSpill() {
-    final int e = equator;
-    bufstart = bufend = e;
-    final int aligned = e - (e % METASIZE);
-    // set start/end to point to first meta record
-    kvstart = kvend =
-      ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
-    if (LOG.isInfoEnabled()) {
-      LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
-        (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
-    }
-  }
-
-  /**
-   * Compute the distance in bytes between two indices in the serialization
-   * buffer.
-   * @see #distanceTo(int,int,int)
-   */
-  final int distanceTo(final int i, final int j) {
-    return distanceTo(i, j, kvbuffer.length);
-  }
-
-  /**
-   * Compute the distance between two indices in the circular buffer given the
-   * max distance.
-   */
-  int distanceTo(final int i, final int j, final int mod) {
-    return i <= j
-      ? j - i
-      : mod - i + j;
-  }
-
-  /**
-   * For the given meta position, return the dereferenced position in the
-   * integer array. Each meta block contains several integers describing
-   * record data in its serialized form, but the INDEX is not necessarily
-   * related to the proximate metadata. The index value at the referenced int
-   * position is the start offset of the associated metadata block. So the
-   * metadata INDEX at metapos may point to the metadata described by the
-   * metadata block at metapos + k, which contains information about that
-   * serialized record.
-   */
-  int offsetFor(int metapos) {
-    return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
-  }
-
-  /**
-   * Compare logical range, st i, j MOD offset capacity.
-   * Compare by partition, then by key.
-   * @see IndexedSortable#compare
-   */
-  public int compare(final int mi, final int mj) {
-    final int kvi = offsetFor(mi);
-    final int kvj = offsetFor(mj);
-    final int kvip = kvmeta.get(kvi + PARTITION);
-    final int kvjp = kvmeta.get(kvj + PARTITION);
-    // sort by partition
-    if (kvip != kvjp) {
-      return kvip - kvjp;
-    }
-    // sort by key
-    return comparator.compare(kvbuffer,
-        kvmeta.get(kvi + KEYSTART),
-        kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
-        kvbuffer,
-        kvmeta.get(kvj + KEYSTART),
-        kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
-  }
-
-  /**
-   * Swap logical indices st i, j MOD offset capacity.
-   * @see IndexedSortable#swap
-   */
-  public void swap(final int mi, final int mj) {
-    final int kvi = (mi % maxRec) * NMETA + INDEX;
-    final int kvj = (mj % maxRec) * NMETA + INDEX;
-    int tmp = kvmeta.get(kvi);
-    kvmeta.put(kvi, kvmeta.get(kvj));
-    kvmeta.put(kvj, tmp);
-  }
-
-  /**
-   * Inner class managing the spill of serialized records to disk.
-   */
-  protected class BlockingBuffer extends DataOutputStream {
-
-    public BlockingBuffer() {
-      super(new Buffer());
-    }
-
-    /**
-     * Mark end of record. Note that this is required if the buffer is to
-     * cut the spill in the proper place.
-     */
-    public int markRecord() {
-      bufmark = bufindex;
-      return bufindex;
-    }
-
-    /**
-     * Set position from last mark to end of writable buffer, then rewrite
-     * the data between last mark and kvindex.
-     * This handles a special case where the key wraps around the buffer.
-     * If the key is to be passed to a RawComparator, then it must be
-     * contiguous in the buffer. This recopies the data in the buffer back
-     * into itself, but starting at the beginning of the buffer. Note that
-     * this method should <b>only</b> be called immediately after detecting
-     * this condition. To call it at any other time is undefined and would
-     * likely result in data loss or corruption.
-     * @see #markRecord()
-     */
-    protected void shiftBufferedKey() throws IOException {
-      // spillLock unnecessary; both kvend and kvindex are current
-      int headbytelen = bufvoid - bufmark;
-      bufvoid = bufmark;
-      final int kvbidx = 4 * kvindex;
-      final int kvbend = 4 * kvend;
-      final int avail =
-        Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
-      if (bufindex + headbytelen < avail) {
-        System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
-        System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
-        bufindex += headbytelen;
-        bufferRemaining -= kvbuffer.length - bufvoid;
-      } else {
-        byte[] keytmp = new byte[bufindex];
-        System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
-        bufindex = 0;
-        out.write(kvbuffer, bufmark, headbytelen);
-        out.write(keytmp);
-      }
-    }
-  }
-
-  public class Buffer extends OutputStream {
-    private final byte[] scratch = new byte[1];
-
-    @Override
-    public void write(int v)
-        throws IOException {
-      scratch[0] = (byte)v;
-      write(scratch, 0, 1);
-    }
-
-    /**
-     * Attempt to write a sequence of bytes to the collection buffer.
-     * This method will block if the spill thread is running and it
-     * cannot write.
-     * @throws MapBufferTooSmallException if record is too large to
-     *    deserialize into the collection buffer.
-     */
-    @Override
-    public void write(byte b[], int off, int len)
-        throws IOException {
-      // must always verify the invariant that at least METASIZE bytes are
-      // available beyond kvindex, even when len == 0
-      bufferRemaining -= len;
-      if (bufferRemaining <= 0) {
-        // writing these bytes could exhaust available buffer space or fill
-        // the buffer to soft limit. check if spill or blocking are necessary
-        boolean blockwrite = false;
-        spillLock.lock();
-        try {
-          do {
-            checkSpillException();
-
-            final int kvbidx = 4 * kvindex;
-            final int kvbend = 4 * kvend;
-            // ser distance to key index
-            final int distkvi = distanceTo(bufindex, kvbidx);
-            // ser distance to spill end index
-            final int distkve = distanceTo(bufindex, kvbend);
-
-            // if kvindex is closer than kvend, then a spill is neither in
-            // progress nor complete and reset since the lock was held. The
-            // write should block only if there is insufficient space to
-            // complete the current write, write the metadata for this record,
-            // and write the metadata for the next record. If kvend is closer,
-            // then the write should block if there is too little space for
-            // either the metadata or the current write. Note that collect
-            // ensures its metadata requirement with a zero-length write
-            blockwrite = distkvi <= distkve
-              ? distkvi <= len + 2 * METASIZE
-              : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
-
-            if (!spillInProgress) {
-              if (blockwrite) {
-                if ((kvbend + METASIZE) % kvbuffer.length !=
-                    equator - (equator % METASIZE)) {
-                  // spill finished, reclaim space
-                  // need to use meta exclusively; zero-len rec & 100% spill
-                  // pcnt would fail
-                  resetSpill(); // resetSpill doesn't move bufindex, kvindex
-                  bufferRemaining = Math.min(
-                      distkvi - 2 * METASIZE,
-                      softLimit - distanceTo(kvbidx, bufindex)) - len;
-                  continue;
-                }
-                // we have records we can spill; only spill if blocked
-                if (kvindex != kvend) {
-                  startSpill();
-                  // Blocked on this write, waiting for the spill just
-                  // initiated to finish. Instead of repositioning the marker
-                  // and copying the partial record, we set the record start
-                  // to be the new equator
-                  setEquator(bufmark);
-                } else {
-                  // We have no buffered records, and this record is too large
-                  // to write into kvbuffer. We must spill it directly from
-                  // collect
-                  final int size = distanceTo(bufstart, bufindex) + len;
-                  setEquator(0);
-                  bufstart = bufend = bufindex = equator;
-                  kvstart = kvend = kvindex;
-                  bufvoid = kvbuffer.length;
-                  throw new MapBufferTooSmallException(size + " bytes");
-                }
-              }
-            }
-
-            if (blockwrite) {
-              // wait for spill
-              try {
-                while (spillInProgress) {
-                  spillDone.await();
-                }
-              } catch (InterruptedException e) {
-                  throw new IOException(
-                      "Buffer interrupted while waiting for the writer", e);
-              }
-            }
-          } while (blockwrite);
-        } finally {
-          spillLock.unlock();
-        }
-      }
-      // here, we know that we have sufficient space to write
-      if (bufindex + len > bufvoid) {
-        final int gaplen = bufvoid - bufindex;
-        System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
-        len -= gaplen;
-        off += gaplen;
-        bufindex = 0;
-      }
-      System.arraycopy(b, off, kvbuffer, bufindex, len);
-      bufindex += len;
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    LOG.info("Starting flush of map output");
-    spillLock.lock();
-    try {
-      while (spillInProgress) {
-        spillDone.await();
-      }
-      checkSpillException();
-
-      final int kvbend = 4 * kvend;
-      if ((kvbend + METASIZE) % kvbuffer.length !=
-          equator - (equator % METASIZE)) {
-        // spill finished
-        resetSpill();
-      }
-      if (kvindex != kvend) {
-        kvend = (kvindex + NMETA) % kvmeta.capacity();
-        bufend = bufmark;
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Sorting & Spilling map output");
-          LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-                   "; bufvoid = " + bufvoid);
-          LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-                   "); kvend = " + kvend + "(" + (kvend * 4) +
-                   "); length = " + (distanceTo(kvend, kvstart,
-                         kvmeta.capacity()) + 1) + "/" + maxRec);
-        }
-        sortAndSpill();
-      }
-    } catch (InterruptedException e) {
-      throw new IOException("Interrupted while waiting for the writer", e);
-    } finally {
-      spillLock.unlock();
-    }
-    assert !spillLock.isHeldByCurrentThread();
-    // shut down spill thread and wait for it to exit. Since the preceding
-    // ensures that it is finished with its work (and sortAndSpill did not
-    // throw), we elect to use an interrupt instead of setting a flag.
-    // Spilling simultaneously from this thread while the spill thread
-    // finishes its work might be both a useful way to extend this and also
-    // sufficient motivation for the latter approach.
-    try {
-      spillThread.interrupt();
-      spillThread.join();
-    } catch (InterruptedException e) {
-      throw new IOException("Spill failed", e);
-    }
-    // release sort buffer before the merge
-    //FIXME
-    //kvbuffer = null;
-    mergeParts();
-    Path outputPath = mapOutputFile.getOutputFile();
-    fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
-  }
-
-  @Override
-  public void close() throws IOException { }
-
-  protected class SpillThread extends Thread {
-
-    @Override
-    public void run() {
-      spillLock.lock();
-      spillThreadRunning = true;
-      try {
-        while (true) {
-          spillDone.signal();
-          while (!spillInProgress) {
-            spillReady.await();
-          }
-          try {
-            spillLock.unlock();
-            sortAndSpill();
-          } catch (Throwable t) {
-            LOG.warn("Got an exception in sortAndSpill", t);
-            sortSpillException = t;
-          } finally {
-            spillLock.lock();
-            if (bufend < bufstart) {
-              bufvoid = kvbuffer.length;
-            }
-            kvstart = kvend;
-            bufstart = bufend;
-            spillInProgress = false;
-          }
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } finally {
-        spillLock.unlock();
-        spillThreadRunning = false;
-      }
-    }
-  }
-
-  private void checkSpillException() throws IOException {
-    final Throwable lspillException = sortSpillException;
-    if (lspillException != null) {
-      if (lspillException instanceof Error) {
-        final String logMsg = "Task " + outputContext.getUniqueIdentifier()
-            + " failed : " + StringUtils.stringifyException(lspillException);
-        outputContext.fatalError(lspillException, logMsg);
-      }
-      throw new IOException("Spill failed", lspillException);
-    }
-  }
-
-  private void startSpill() {
-    assert !spillInProgress;
-    kvend = (kvindex + NMETA) % kvmeta.capacity();
-    bufend = bufmark;
-    spillInProgress = true;
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Spilling map output");
-      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
-               "; bufvoid = " + bufvoid);
-      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
-               "); kvend = " + kvend + "(" + (kvend * 4) +
-               "); length = " + (distanceTo(kvend, kvstart,
-                     kvmeta.capacity()) + 1) + "/" + maxRec);
-    }
-    spillReady.signal();
-  }
-
-  int getMetaStart() {
-    return kvend / NMETA;
-  }
-
-  int getMetaEnd() {
-    return 1 + // kvend is a valid record
-        (kvstart >= kvend
-        ? kvstart
-        : kvmeta.capacity() + kvstart) / NMETA;
-  }
-
-  protected void sortAndSpill()
-      throws IOException, InterruptedException {
-    final int mstart = getMetaStart();
-    final int mend = getMetaEnd();
-    sorter.sort(this, mstart, mend, nullProgressable);
-    spill(mstart, mend);
-  }
-
-  protected void spill(int mstart, int mend)
-      throws IOException, InterruptedException {
-
-    //approximate the length of the output file to be the length of the
-    //buffer + header lengths for the partitions
-    final long size = (bufend >= bufstart
-        ? bufend - bufstart
-        : (bufvoid - bufend) + bufstart) +
-                partitions * APPROX_HEADER_LENGTH;
-    FSDataOutputStream out = null;
-    try {
-      // create spill file
-      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-      final Path filename =
-          mapOutputFile.getSpillFileForWrite(numSpills, size);
-      out = rfs.create(filename);
-
-      int spindex = mstart;
-      final InMemValBytes value = createInMemValBytes();
-      for (int i = 0; i < partitions; ++i) {
-        IFile.Writer writer = null;
-        try {
-          long segmentStart = out.getPos();
-          writer = new Writer(conf, out, keyClass, valClass, codec,
-                                    spilledRecordsCounter);
-          if (combiner == null) {
-            // spill directly
-            DataInputBuffer key = new DataInputBuffer();
-            while (spindex < mend &&
-                kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
-              final int kvoff = offsetFor(spindex);
-              key.reset(
-                  kvbuffer,
-                  kvmeta.get(kvoff + KEYSTART),
-                  (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
-                  );
-              getVBytesForOffset(kvoff, value);
-              writer.append(key, value);
-              ++spindex;
-            }
-          } else {
-            int spstart = spindex;
-            while (spindex < mend &&
-                kvmeta.get(offsetFor(spindex)
-                          + PARTITION) == i) {
-              ++spindex;
-            }
-            // Note: we would like to avoid the combiner if we've fewer
-            // than some threshold of records for a partition
-            if (spstart != spindex) {
-              TezRawKeyValueIterator kvIter =
-                new MRResultIterator(spstart, spindex);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Running combine processor");
-              }
-              runCombineProcessor(kvIter, writer);
-            }
-          }
-
-          // close the writer
-          writer.close();
-
-          // record offsets
-          final TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
-          spillRec.putIndex(rec, i);
-
-          writer = null;
-        } finally {
-          if (null != writer) writer.close();
-        }
-      }
-
-      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
-        // create spill index file
-        Path indexFilename =
-            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
-                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRec.writeToFile(indexFilename, conf);
-      } else {
-        indexCacheList.add(spillRec);
-        totalIndexCacheMemory +=
-          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      }
-      LOG.info("Finished spill " + numSpills);
-      ++numSpills;
-    } finally {
-      if (out != null) out.close();
-    }
-  }
-
-  /**
-   * Handles the degenerate case where serialization fails to fit in
-   * the in-memory buffer, so we must spill the record from collect
-   * directly to a spill file. Consider this "losing".
-   */
-  private void spillSingleRecord(final Object key, final Object value,
-                                 int partition) throws IOException {
-    long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
-    FSDataOutputStream out = null;
-    try {
-      // create spill file
-      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-      final Path filename =
-          mapOutputFile.getSpillFileForWrite(numSpills, size);
-      out = rfs.create(filename);
-
-      // we don't run the combiner for a single record
-      for (int i = 0; i < partitions; ++i) {
-        IFile.Writer writer = null;
-        try {
-          long segmentStart = out.getPos();
-          // Create a new codec, don't care!
-          writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
-                                          spilledRecordsCounter);
-
-          if (i == partition) {
-            final long recordStart = out.getPos();
-            writer.append(key, value);
-            // Note that our map byte count will not be accurate with
-            // compression
-            mapOutputByteCounter.increment(out.getPos() - recordStart);
-          }
-          writer.close();
-
-          // record offsets
-          TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
-          spillRec.putIndex(rec, i);
-
-          writer = null;
-        } catch (IOException e) {
-          if (null != writer) writer.close();
-          throw e;
-        }
-      }
-      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
-        // create spill index file
-        Path indexFilename =
-            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
-                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRec.writeToFile(indexFilename, conf);
-      } else {
-        indexCacheList.add(spillRec);
-        totalIndexCacheMemory +=
-          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      }
-      ++numSpills;
-    } finally {
-      if (out != null) out.close();
-    }
-  }
-
-  protected int getInMemVBytesLength(int kvoff) {
-    // get the keystart for the next serialized value to be the end
-    // of this value. If this is the last value in the buffer, use bufend
-    final int nextindex = kvoff == kvend
-      ? bufend
-      : kvmeta.get(
-          (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
-    // calculate the length of the value
-    int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
-      ? nextindex - kvmeta.get(kvoff + VALSTART)
-      : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
-      return vallen;
-  }
-
-  /**
-   * Given an offset, populate vbytes with the associated set of
-   * deserialized value bytes. Should only be called during a spill.
-   */
-  int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
-    int vallen = getInMemVBytesLength(kvoff);
-    vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
-    return vallen;
-  }
-
-  /**
-   * Inner class wrapping valuebytes, used for appendRaw.
-   */
-  static class InMemValBytes extends DataInputBuffer {
-    private byte[] buffer;
-    private int start;
-    private int length;
-    private final int bufvoid;
-
-    public InMemValBytes(int bufvoid) {
-      this.bufvoid = bufvoid;
-    }
-
-    public void reset(byte[] buffer, int start, int length) {
-      this.buffer = buffer;
-      this.start = start;
-      this.length = length;
-
-      if (start + length > bufvoid) {
-        this.buffer = new byte[this.length];
-        final int taillen = bufvoid - start;
-        System.arraycopy(buffer, start, this.buffer, 0, taillen);
-        System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
-        this.start = 0;
-      }
-
-      super.reset(this.buffer, this.start, this.length);
-    }
-  }
-
-  InMemValBytes createInMemValBytes() {
-    return new InMemValBytes(bufvoid);
-  }
-
-  protected class MRResultIterator implements TezRawKeyValueIterator {
-    private final DataInputBuffer keybuf = new DataInputBuffer();
-    private final InMemValBytes vbytes = createInMemValBytes();
-    private final int end;
-    private int current;
-    public MRResultIterator(int start, int end) {
-      this.end = end;
-      current = start - 1;
-    }
-    public boolean next() throws IOException {
-      return ++current < end;
-    }
-    public DataInputBuffer getKey() throws IOException {
-      final int kvoff = offsetFor(current);
-      keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
-          kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
-      return keybuf;
-    }
-    public DataInputBuffer getValue() throws IOException {
-      getVBytesForOffset(offsetFor(current), vbytes);
-      return vbytes;
-    }
-    public Progress getProgress() {
-      return null;
-    }
-    public void close() { }
-  }
-
-  private void mergeParts() throws IOException {
-    // get the approximate size of the final output/index files
-    long finalOutFileSize = 0;
-    long finalIndexFileSize = 0;
-    final Path[] filename = new Path[numSpills];
-    final String taskIdentifier = outputContext.getUniqueIdentifier();
-
-    for(int i = 0; i < numSpills; i++) {
-      filename[i] = mapOutputFile.getSpillFile(i);
-      finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
-    }
-    if (numSpills == 1) { //the spill is the final output
-      sameVolRename(filename[0],
-          mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
-      if (indexCacheList.size() == 0) {
-        sameVolRename(mapOutputFile.getSpillIndexFile(0),
-          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
-      } else {
-        indexCacheList.get(0).writeToFile(
-          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
-      }
-      return;
-    }
-
-    // read in paged indices
-    for (int i = indexCacheList.size(); i < numSpills; ++i) {
-      Path indexFileName = mapOutputFile.getSpillIndexFile(i);
-      indexCacheList.add(new TezSpillRecord(indexFileName, conf));
-    }
-
-    //make correction in the length to include the sequence file header
-    //lengths for each partition
-    finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
-    finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-    Path finalOutputFile =
-        mapOutputFile.getOutputFileForWrite(finalOutFileSize);
-    Path finalIndexFile =
-        mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
-
-    //The output stream for the final single output file
-    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-
-    if (numSpills == 0) {
-      //create dummy files
-
-      TezSpillRecord sr = new TezSpillRecord(partitions);
-      try {
-        for (int i = 0; i < partitions; i++) {
-          long segmentStart = finalOut.getPos();
-          Writer writer =
-            new Writer(conf, finalOut, keyClass, valClass, codec, null);
-          writer.close();
-
-          TezIndexRecord rec =
-              new TezIndexRecord(
-                  segmentStart,
-                  writer.getRawLength(),
-                  writer.getCompressedLength());
-          sr.putIndex(rec, i);
-        }
-        sr.writeToFile(finalIndexFile, conf);
-      } finally {
-        finalOut.close();
-      }
-      return;
-    }
-    else {
-      TezMerger.considerFinalMergeForProgress();
-
-      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-      for (int parts = 0; parts < partitions; parts++) {
-        //create the segments to be merged
-        List<Segment> segmentList =
-          new ArrayList<Segment>(numSpills);
-        for(int i = 0; i < numSpills; i++) {
-          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
-
-          Segment s =
-            new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, true);
-          segmentList.add(i, s);
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
-                "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
-                indexRecord.getRawLength() + ", " +
-                indexRecord.getPartLength() + ")");
-          }
-        }
-
-        int mergeFactor =
-            this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
-                TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-        // sort the segments only if there are intermediate merges
-        boolean sortSegments = segmentList.size() > mergeFactor;
-        //merge
-        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
-                       keyClass, valClass, codec,
-                       segmentList, mergeFactor,
-                       new Path(taskIdentifier),
-                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
-                       nullProgressable, sortSegments,
-                       null, spilledRecordsCounter,
-                       null); // Not using any Progress in TezMerger. Should just work.
-
-        //write merged output to disk
-        long segmentStart = finalOut.getPos();
-        Writer writer =
-            new Writer(conf, finalOut, keyClass, valClass, codec,
-                spilledRecordsCounter);
-        if (combiner == null || numSpills < minSpillsForCombine) {
-          TezMerger.writeFile(kvIter, writer,
-              nullProgressable, conf);
-        } else {
-          runCombineProcessor(kvIter, writer);
-        }
-        writer.close();
-
-        // record offsets
-        final TezIndexRecord rec =
-            new TezIndexRecord(
-                segmentStart,
-                writer.getRawLength(),
-                writer.getCompressedLength());
-        spillRec.putIndex(rec, parts);
-      }
-      spillRec.writeToFile(finalIndexFile, conf);
-      finalOut.close();
-      for(int i = 0; i < numSpills; i++) {
-        rfs.delete(filename[i],true);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
deleted file mode 100644
index e2b3315..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.common.sort.impl.IFile;
-
-public class InMemoryShuffleSorter extends DefaultSorter {
-
-  private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
-  
-  static final int IFILE_EOF_LENGTH = 
-      2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
-  static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
-  
-  private List<Integer> spillIndices = new ArrayList<Integer>();
-  private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
-
-  ShuffleHandler shuffleHandler = new ShuffleHandler(this);
-  
-  byte[] kvbuffer;
-  IntBuffer kvmeta;
-
-  @Override
-  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
-    super.initialize(outputContext, conf, numOutputs);
-    shuffleHandler.initialize(outputContext, conf);
-  }
-
-  @Override
-  protected void spill(int mstart, int mend) 
-      throws IOException, InterruptedException {
-    // Start the shuffleHandler
-    shuffleHandler.start();
-
-    // Don't spill!
-    
-    // Make a copy
-    this.kvbuffer = super.kvbuffer;
-    this.kvmeta = super.kvmeta;
-
-    // Just save spill-indices for serving later
-    int spindex = mstart;
-    for (int i = 0; i < partitions; ++i) {
-      spillIndices.add(spindex);
-      
-      int length = 0;
-      while (spindex < mend &&
-          kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
-
-        final int kvoff = offsetFor(spindex);
-        int keyLen = 
-            kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
-        int valLen = getInMemVBytesLength(kvoff);
-        length += 
-            (keyLen + WritableUtils.getVIntSize(keyLen)) + 
-            (valLen + WritableUtils.getVIntSize(valLen));
-
-        ++spindex;
-      }
-      length += IFILE_EOF_LENGTH;
-      
-      shuffleHeaders.add( 
-          new ShuffleHeader(
-              outputContext.getUniqueIdentifier(), // TODO Verify that this is correct. 
-              length + IFILE_CHECKSUM_LENGTH, length, i)
-          );
-      LOG.info("shuffleHeader[" + i + "]:" +
-      		" rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) + 
-          " spillIndex=" + spillIndices.get(i));
-    }
-    
-    LOG.info("Saved " + spillIndices.size() + " spill-indices and " + 
-        shuffleHeaders.size() + " shuffle headers");
-  }
-
-  @Override
-  public InputStream getSortedStream(int partition) {
-    return new SortBufferInputStream(this, partition);
-  }
-
-  @Override
-  public void close() throws IOException {
-    // FIXME
-    //shuffleHandler.stop();
-  }
-
-  @Override
-  public ShuffleHeader getShuffleHeader(int reduce) {
-    return shuffleHeaders.get(reduce);
-  }
-
-  public int getSpillIndex(int partition) {
-    return spillIndices.get(partition);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
deleted file mode 100644
index d74e159..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.sort.impl.dflt;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.IntBuffer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
-import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
-
-  public class SortBufferInputStream extends InputStream {
-
-  private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
-  
-  private final InMemoryShuffleSorter sorter;
-  private InMemoryWriter sortOutput;
-  
-  private int mend;
-  private int recIndex;
-  private final byte[] kvbuffer;       
-  private final IntBuffer kvmeta;
-  private final int partitionBytes;
-  private final int partition;
-  
-  byte[] dualBuf = new byte[8192];
-  DualBufferOutputStream out;
-  private int readBytes = 0;
-  
-  public SortBufferInputStream(
-      InMemoryShuffleSorter sorter, int partition) {
-    this.sorter = sorter;
-    this.partitionBytes = 
-        (int)sorter.getShuffleHeader(partition).getCompressedLength();
-    this.partition = partition;
-    this.mend = sorter.getMetaEnd();
-    this.recIndex = sorter.getSpillIndex(partition);
-    this.kvbuffer = sorter.kvbuffer;
-    this.kvmeta = sorter.kvmeta;
-    out = new DualBufferOutputStream(null, 0, 0, dualBuf);
-    sortOutput = new InMemoryWriter(out);
-  }
-  
-  byte[] one = new byte[1];
-  
-  @Override
-  public int read() throws IOException {
-    int b = read(one, 0, 1);
-    return (b == -1) ? b : one[0]; 
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    if (available() == 0) {
-      return -1;
-    }
-    
-    int currentOffset = off;
-    int currentLength = len;
-    int currentReadBytes = 0;
-    
-    // Check if there is residual data in the dualBuf
-    int residualLen = out.getCurrent();
-    if (residualLen > 0) {
-      int readable = Math.min(currentLength, residualLen);
-      System.arraycopy(dualBuf, 0, b, currentOffset, readable);
-      currentOffset += readable;
-      currentReadBytes += readable;
-      out.setCurrentPointer(-readable);
-      
-      // buffer has less capacity
-      currentLength -= readable;
-      
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX read_residual:" +
-            " readable=" + readable +
-            " readBytes=" + readBytes);
-      }
-    }
-    
-    // Now, use the provided buffer
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("XXX read: out.reset" +
-          " b=" + b + 
-          " currentOffset=" + currentOffset + 
-          " currentLength=" + currentLength +
-          " recIndex=" + recIndex);
-    }
-    out.reset(b, currentOffset, currentLength);
-    
-    // Read from sort-buffer into the provided buffer, space permitting
-    DataInputBuffer key = new DataInputBuffer();
-    final InMemValBytes value = sorter.createInMemValBytes();
-    
-    int kvPartition = 0;
-    int numRec = 0;
-    for (;
-         currentLength > 0 && recIndex < mend && 
-             (kvPartition = getKVPartition(recIndex)) == partition;
-        ++recIndex) {
-      
-      final int kvoff = sorter.offsetFor(recIndex);
-      
-      int keyLen = 
-          (kvmeta.get(kvoff + InMemoryShuffleSorter.VALSTART) - 
-              kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART));
-      key.reset(
-          kvbuffer, 
-          kvmeta.get(kvoff + InMemoryShuffleSorter.KEYSTART),
-          keyLen
-          );
-      
-      int valLen = sorter.getVBytesForOffset(kvoff, value);
-
-      int recLen = 
-          (keyLen + WritableUtils.getVIntSize(keyLen)) + 
-          (valLen + WritableUtils.getVIntSize(valLen));
-      
-      currentReadBytes += recLen;
-      currentOffset += recLen;
-      currentLength -= recLen;
-
-      // Write out key/value into the in-mem ifile
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX read: sortOutput.append" +
-            " #rec=" + ++numRec +
-            " recIndex=" + recIndex + " kvoff=" + kvoff + 
-            " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen +
-            " readBytes=" + readBytes +
-            " currentReadBytes="  + currentReadBytes +
-            " currentLength=" + currentLength);
-      }
-      sortOutput.append(key, value);
-    }
-
-    // If we are at the end of the segment, close the ifile
-    if (currentLength > 0 && 
-        (recIndex == mend || kvPartition != partition)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX About to call close:" +
-            " currentLength=" + currentLength + 
-            " recIndex=" + recIndex + " mend=" + mend + 
-            " kvPartition=" + kvPartition + " partitino=" + partition);
-      }
-      sortOutput.close();
-      currentReadBytes += 
-          (InMemoryShuffleSorter.IFILE_EOF_LENGTH + 
-              InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("XXX Hmm..." +
-            " currentLength=" + currentLength + 
-            " recIndex=" + recIndex + " mend=" + mend + 
-            " kvPartition=" + kvPartition + " partitino=" + partition);
-      }
-    }
-    
-    int retVal = Math.min(currentReadBytes, len);
-    readBytes += retVal;
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("XXX read: done" +
-          " retVal=" + retVal + 
-          " currentReadBytes=" + currentReadBytes +
-          " len=" + len + 
-          " readBytes=" + readBytes +
-          " partitionBytes=" + partitionBytes +
-          " residualBytes=" + out.getCurrent());
-    }
-    return retVal;
-  }
-
-  private int getKVPartition(int recIndex) {
-    return kvmeta.get(
-        sorter.offsetFor(recIndex) + InMemoryShuffleSorter.PARTITION);
-  }
-  
-  @Override
-  public int available() throws IOException {
-    return (partitionBytes - readBytes);
-  }
-
-  @Override
-  public void close() throws IOException {
-    super.close();
-  }
-
-  @Override
-  public boolean markSupported() {
-    return false;
-  }
-  
-  static class DualBufferOutputStream extends BoundedByteArrayOutputStream {
-
-    byte[] dualBuf;
-    int currentPointer = 0;
-    byte[] one = new byte[1];
-    
-    public DualBufferOutputStream(
-        byte[] buf, int offset, int length, 
-        byte[] altBuf) {
-      super(buf, offset, length);
-      this.dualBuf = altBuf;
-    }
-    
-    public void reset(byte[] b, int off, int len) {
-      super.resetBuffer(b, off, len);
-    }
-
-    @Override
-    public void write(int b) throws IOException {
-      one[0] = (byte)b;
-      write(one, 0, 1);
-    }
-
-    @Override
-    public void write(byte[] b) throws IOException {
-      write(b, 0, b.length);
-    }
-
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-      int available = super.available();
-      if (available >= len) {
-        super.write(b, off, len);
-      } else {
-        super.write(b, off, available);
-        System.arraycopy(b, off+available, dualBuf, currentPointer, len-available);
-        currentPointer += (len - available);
-      }
-    }
-    
-    int getCurrent() {
-      return currentPointer;
-    }
-    
-    void setCurrentPointer(int delta) {
-      if ((currentPointer + delta) > dualBuf.length) {
-        throw new IndexOutOfBoundsException("Trying to set dualBuf 'current'" +
-        		" marker to " + (currentPointer+delta) + " when " +
-        		" dualBuf.length is " + dualBuf.length);
-      }
-      currentPointer = (currentPointer + delta) % dualBuf.length;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
deleted file mode 100644
index 841e54d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/ValuesIterator.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.task.impl;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-
-/**
- * Iterates values while keys match in sorted input.
- *
- * Usage: Call moveToNext to move to the next k, v pair. This returns true if another exists,
- * followed by getKey() and getValues() to get the current key and list of values.
- * 
- */
-public class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
-  protected TezRawKeyValueIterator in; //input iterator
-  private KEY key;               // current key
-  private KEY nextKey;
-  private VALUE value;             // current value
-  private boolean hasNext;                      // more w/ this key
-  private boolean more;                         // more in file
-  private RawComparator<KEY> comparator;
-  protected Progressable reporter;
-  private Deserializer<KEY> keyDeserializer;
-  private Deserializer<VALUE> valDeserializer;
-  private DataInputBuffer keyIn = new DataInputBuffer();
-  private DataInputBuffer valueIn = new DataInputBuffer();
-  
-  public ValuesIterator (TezRawKeyValueIterator in, 
-                         RawComparator<KEY> comparator, 
-                         Class<KEY> keyClass,
-                         Class<VALUE> valClass, Configuration conf, 
-                         Progressable reporter)
-    throws IOException {
-    this.in = in;
-    this.comparator = comparator;
-    this.reporter = reporter;
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
-    this.keyDeserializer.open(keyIn);
-    this.valDeserializer = serializationFactory.getDeserializer(valClass);
-    this.valDeserializer.open(this.valueIn);
-    readNextKey();
-    key = nextKey;
-    nextKey = null; // force new instance creation
-    hasNext = more;
-  }
-
-  TezRawKeyValueIterator getRawIterator() { return in; }
-  
-  /// Iterator methods
-
-  public boolean hasNext() { return hasNext; }
-
-  private int ctr = 0;
-  public VALUE next() {
-    if (!hasNext) {
-      throw new NoSuchElementException("iterate past last value");
-    }
-    try {
-      readNextValue();
-      readNextKey();
-    } catch (IOException ie) {
-      throw new RuntimeException("problem advancing post rec#"+ctr, ie);
-    }
-    reporter.progress();
-    return value;
-  }
-
-  public void remove() { throw new RuntimeException("not implemented"); }
-
-  /// Auxiliary methods
-
-  /** Start processing next unique key. */
-  public void nextKey() throws IOException {
-    // read until we find a new key
-    while (hasNext) { 
-      readNextKey();
-    }
-    ++ctr;
-    
-    // move the next key to the current one
-    KEY tmpKey = key;
-    key = nextKey;
-    nextKey = tmpKey;
-    hasNext = more;
-  }
-
-  /** True iff more keys remain. */
-  public boolean more() { 
-    return more; 
-  }
-
-  /** The current key. */
-  public KEY getKey() { 
-    return key; 
-  }
-
-  /** 
-   * read the next key 
-   */
-  private void readNextKey() throws IOException {
-    more = in.next();
-    if (more) {
-      DataInputBuffer nextKeyBytes = in.getKey();
-      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
-      nextKey = keyDeserializer.deserialize(nextKey);
-      hasNext = key != null && (comparator.compare(key, nextKey) == 0);
-    } else {
-      hasNext = false;
-    }
-  }
-
-  /**
-   * Read the next value
-   * @throws IOException
-   */
-  private void readNextValue() throws IOException {
-    DataInputBuffer nextValueBytes = in.getValue();
-    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
-    value = valDeserializer.deserialize(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
deleted file mode 100644
index 40e6b1a..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from the Child running the Task.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezLocalTaskOutputFiles extends TezTaskOutput {
-
-  public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
-    super(conf, uniqueId);
-  }
-
-  private LocalDirAllocator lDirAlloc =
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
-  }
-  
-  /**
-   * Create a local map output file name. This should *only* be used if the size
-   * of the file is not known. Otherwise use the equivalent which accepts a size
-   * parameter.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFileForWrite() throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
-        + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
-        conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  @Override
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputIndexFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputIndexFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  @Override
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(),
-        Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillIndexFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param mapId a map task id
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getInputFile(InputAttemptIdentifier mapId)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, 
-        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
-  }
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param mapId a map task id
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getInputFileForWrite(int taskId,
-                                   long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  @Override
-  public void removeAll()
-      throws IOException {
-    deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
-  }
-
-  private String[] getLocalDirs() throws IOException {
-    return conf.getStrings(TezJobConfig.LOCAL_DIRS);
-  }
-
-  @SuppressWarnings("deprecation")
-  private void deleteLocalFiles(String subdir) throws IOException {
-    String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
deleted file mode 100644
index e1d83ad..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class TezTaskOutput {
-
-  protected Configuration conf;
-  protected String uniqueId;
-
-  public TezTaskOutput(Configuration conf, String uniqueId) {
-    this.conf = conf;
-    this.uniqueId = uniqueId;
-  }
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFile() throws IOException;
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFileForWrite(long size) throws IOException;
-
-  /**
-   * Create a local output file name. This method is meant to be used *only* if
-   * the size of the file is not know up front.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFileForWrite() throws IOException;
-  
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public abstract Path getOutputFileForWriteInVolume(Path existing);
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputIndexFile() throws IOException;
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillFile(int spillNumber) throws IOException;
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException;
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException;
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param attemptIdentifier The identifier for the source task
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param taskIdentifier The identifier for the source task
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getInputFileForWrite(
-      int taskIdentifier, long size) throws IOException;
-
-  /** Removes all of the files related to a task. */
-  public abstract void removeAll() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
deleted file mode 100644
index b8f051b..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.output;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezTaskOutputFiles extends TezTaskOutput {
-  
-  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
-    super(conf, uniqueId);
-  }
-
-  private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
-
-  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
-  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
-      + ".index";
-
-  
-
-  // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc =
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-  
-
-  private Path getAttemptOutputDir() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getAttemptOutputDir: "
-          + Constants.TASK_OUTPUT_DIR + "/"
-          + uniqueId);
-    }
-    return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
-  }
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFile() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite(long size) throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
-  }
-
-  /**
-   * Create a local map output file name. This should *only* be used if the size
-   * of the file is not known. Otherwise use the equivalent which accepts a size
-   * parameter.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir, uniqueId);
-    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFile() throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFileForWrite(long size) throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir, uniqueId);
-    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber)), size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            uniqueId, spillNumber), size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFileForWrite(int srcTaskId,
-      long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  public void removeAll() throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
deleted file mode 100644
index 5071dd2..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.hadoop.compat;
-
-import org.apache.hadoop.util.Progressable;
-
-public class NullProgressable implements Progressable {
-
-  public NullProgressable() {
-    // TODO Auto-generated constructor stub
-  }
-
-  @Override
-  public void progress() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
deleted file mode 100644
index 6371787..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.input;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.localshuffle.LocalShuffle;
-
-/**
- * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer. 
- */
-public class LocalMergedInput extends ShuffledMergedInputLegacy {
-
-  @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-
-    LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
-    rawIter = localShuffle.run();
-    createValuesIterator();
-    return Collections.emptyList();
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    rawIter.close();
-    return Collections.emptyList();
-  }
-}