You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:49 UTC

[42/50] [abbrv] Rename tez-engine-api to tez-runtime-api and tez-engine is split into 2: - tez-engine-library for user-visible Input/Output/Processor implementations - tez-engine-internals for framework internals

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
deleted file mode 100644
index b2a0b54..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Comparator;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-
-
-class MapOutput {
-  private static final Log LOG = LogFactory.getLog(MapOutput.class);
-  private static AtomicInteger ID = new AtomicInteger(0);
-  
-  public static enum Type {
-    WAIT,
-    MEMORY,
-    DISK
-  }
-  
-  private InputAttemptIdentifier attemptIdentifier;
-  private final int id;
-  
-  private final MergeManager merger;
-  
-  private final long size;
-  
-  private final byte[] memory;
-  private BoundedByteArrayOutputStream byteStream;
-  
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final Path outputPath;
-  private final OutputStream disk; 
-  
-  private final Type type;
-  
-  private final boolean primaryMapOutput;
-  
-  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
-            Configuration conf, LocalDirAllocator localDirAllocator,
-            int fetcher, boolean primaryMapOutput, 
-            TezTaskOutputFiles mapOutputFile)
-         throws IOException {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
-
-    type = Type.DISK;
-
-    memory = null;
-    byteStream = null;
-
-    this.size = size;
-    
-    this.localFS = FileSystem.getLocal(conf);
-    outputPath =
-      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getInputIdentifier().getSrcTaskIndex(), size);
-    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
-    disk = localFS.create(tmpOutputPath);
-    
-    this.primaryMapOutput = primaryMapOutput;
-  }
-  
-  MapOutput(InputAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
-            boolean primaryMapOutput) {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
-    this.merger = merger;
-
-    type = Type.MEMORY;
-    byteStream = new BoundedByteArrayOutputStream(size);
-    memory = byteStream.getBuffer();
-
-    this.size = size;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-    
-    this.primaryMapOutput = primaryMapOutput;
-  }
-
-  public MapOutput(InputAttemptIdentifier attemptIdentifier) {
-    this.id = ID.incrementAndGet();
-    this.attemptIdentifier = attemptIdentifier;
-
-    type = Type.WAIT;
-    merger = null;
-    memory = null;
-    byteStream = null;
-    
-    size = -1;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-
-    this.primaryMapOutput = false;
-}
-  
-  public boolean isPrimaryMapOutput() {
-    return primaryMapOutput;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof MapOutput) {
-      return id == ((MapOutput)obj).id;
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return id;
-  }
-
-  public Path getOutputPath() {
-    return outputPath;
-  }
-
-  public byte[] getMemory() {
-    return memory;
-  }
-
-  public BoundedByteArrayOutputStream getArrayStream() {
-    return byteStream;
-  }
-  
-  public OutputStream getDisk() {
-    return disk;
-  }
-
-  public InputAttemptIdentifier getAttemptIdentifier() {
-    return this.attemptIdentifier;
-  }
-
-  public Type getType() {
-    return type;
-  }
-
-  public long getSize() {
-    return size;
-  }
-
-  public void commit() throws IOException {
-    if (type == Type.MEMORY) {
-      merger.closeInMemoryFile(this);
-    } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath);
-      merger.closeOnDiskFile(outputPath);
-    } else {
-      throw new IOException("Cannot commit MapOutput of type WAIT!");
-    }
-  }
-  
-  public void abort() {
-    if (type == Type.MEMORY) {
-      merger.unreserve(memory.length);
-    } else if (type == Type.DISK) {
-      try {
-        localFS.delete(tmpOutputPath, false);
-      } catch (IOException ie) {
-        LOG.info("failure to clean up " + tmpOutputPath, ie);
-      }
-    } else {
-      throw new IllegalArgumentException
-                   ("Cannot commit MapOutput with of type WAIT!");
-    }
-  }
-  
-  public String toString() {
-    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
-  }
-  
-  public static class MapOutputComparator 
-  implements Comparator<MapOutput> {
-    public int compare(MapOutput o1, MapOutput o2) {
-      if (o1.id == o2.id) { 
-        return 0;
-      }
-      
-      if (o1.size < o2.size) {
-        return -1;
-      } else if (o1.size > o2.size) {
-        return 1;
-      }
-      
-      if (o1.id < o2.id) {
-        return -1;
-      } else {
-        return 1;
-      
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
deleted file mode 100644
index b8792fb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ /dev/null
@@ -1,782 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.common.sort.impl.TezMerger;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
-import org.apache.tez.engine.hadoop.compat.NullProgressable;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@SuppressWarnings(value={"rawtypes"})
-public class MergeManager {
-  
-  private static final Log LOG = LogFactory.getLog(MergeManager.class);
-
-  private final Configuration conf;
-  private final FileSystem localFS;
-  private final FileSystem rfs;
-  private final LocalDirAllocator localDirAllocator;
-  
-  private final  TezTaskOutputFiles mapOutputFile;
-  private final Progressable nullProgressable = new NullProgressable();
-  private final Combiner combiner;  
-  
-  Set<MapOutput> inMemoryMergedMapOutputs = 
-    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
-  Set<MapOutput> inMemoryMapOutputs = 
-    new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
-  private final InMemoryMerger inMemoryMerger;
-  
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
-  private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
-  private long usedMemory;
-  private long commitMemory;
-  private final long maxSingleShuffleLimit;
-  
-  private final int memToMemMergeOutputsThreshold; 
-  private final long mergeThreshold;
-  
-  private final int ioSortFactor;
-
-  private final ExceptionReporter exceptionReporter;
-  
-  private final TezInputContext inputContext;
-
-  private final TezCounter spilledRecordsCounter;
-
-  private final TezCounter reduceCombineInputCounter;
-
-  private final TezCounter mergedMapOutputsCounter;
-  
-  private final CompressionCodec codec;
-  
-  private volatile boolean finalMergeComplete = false;
-
-  public MergeManager(Configuration conf, 
-                      FileSystem localFS,
-                      LocalDirAllocator localDirAllocator,  
-                      TezInputContext inputContext,
-                      Combiner combiner,
-                      TezCounter spilledRecordsCounter,
-                      TezCounter reduceCombineInputCounter,
-                      TezCounter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter) {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.localDirAllocator = localDirAllocator;
-    this.exceptionReporter = exceptionReporter;
-    
-    this.combiner = combiner;
-
-    this.reduceCombineInputCounter = reduceCombineInputCounter;
-    this.spilledRecordsCounter = spilledRecordsCounter;
-    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
-    
-    this.localFS = localFS;
-    this.rfs = ((LocalFileSystem)localFS).getRaw();
-
-    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
-      Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, conf);
-    } else {
-      codec = null;
-    }
-
-    final float maxInMemCopyUse =
-      conf.getFloat(
-          TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT, 
-          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for " +
-          TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
-          maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(conf.getLong(Constants.TEZ_ENGINE_TASK_MEMORY,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
-    this.ioSortFactor = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
-
-    final float singleShuffleMemoryLimitPercent =
-        conf.getFloat(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT,
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
-    this.memToMemMergeOutputsThreshold = 
-            conf.getInt(
-                TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS, 
-                ioSortFactor);
-    this.mergeThreshold = 
-        (long)(this.memoryLimit * 
-               conf.getFloat(
-                   TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT, 
-                   TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_MERGE_PERCENT));
-    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
-             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
-             "mergeThreshold=" + mergeThreshold + ", " + 
-             "ioSortFactor=" + ioSortFactor + ", " +
-             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
-    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
-          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
-          + "mergeThreshold: " + this.mergeThreshold);
-    }
-
-    boolean allowMemToMemMerge = 
-      conf.getBoolean(
-          TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM, 
-          TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
-    if (allowMemToMemMerge) {
-      this.memToMemMerger = 
-        new IntermediateMemoryToMemoryMerger(this,
-                                             memToMemMergeOutputsThreshold);
-      this.memToMemMerger.start();
-    } else {
-      this.memToMemMerger = null;
-    }
-    
-    this.inMemoryMerger = new InMemoryMerger(this);
-    this.inMemoryMerger.start();
-    
-    this.onDiskMerger = new OnDiskMerger(this);
-    this.onDiskMerger.start();
-  }
-
-  public void waitForInMemoryMerge() throws InterruptedException {
-    inMemoryMerger.waitForMerge();
-  }
-  
-  private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit); 
-  }
-
-  final private MapOutput stallShuffle = new MapOutput(null);
-
-  public synchronized MapOutput reserve(InputAttemptIdentifier srcAttemptIdentifier, 
-                                             long requestedSize,
-                                             int fetcher
-                                             ) throws IOException {
-    if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
-               " is greater than maxSingleShuffleLimit (" + 
-               maxSingleShuffleLimit + ")");
-      return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf, 
-                                localDirAllocator, fetcher, true,
-                                mapOutputFile);
-    }
-    
-    // Stall shuffle if we are above the memory limit
-
-    // It is possible that all threads could just be stalling and not make
-    // progress at all. This could happen when:
-    //
-    // requested size is causing the used memory to go above limit &&
-    // requested size < singleShuffleLimit &&
-    // current used size < mergeThreshold (merge will not get triggered)
-    //
-    // To avoid this from happening, we allow exactly one thread to go past
-    // the memory limit. We check (usedMemory > memoryLimit) and not
-    // (usedMemory + requestedSize > memoryLimit). When this thread is done
-    // fetching, this will automatically trigger a merge thereby unlocking
-    // all the stalled threads
-    
-    if (usedMemory > memoryLimit) {
-      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
-          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
-          " CommitMemory is (" + commitMemory + ")"); 
-      return stallShuffle;
-    }
-    
-    // Allow the in-memory shuffle to progress
-    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
-        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
-        + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
-  }
-  
-  /**
-   * Unconditional Reserve is used by the Memory-to-Memory thread
-   * @return
-   */
-  private synchronized MapOutput unconditionalReserve(
-      InputAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
-    usedMemory += requestedSize;
-    return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, 
-        primaryMapOutput);
-  }
-  
-  synchronized void unreserve(long size) {
-    commitMemory -= size;
-    usedMemory -= size;
-  }
-
-  public synchronized void closeInMemoryFile(MapOutput mapOutput) { 
-    inMemoryMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
-    commitMemory+= mapOutput.getSize();
-
-    synchronized (inMemoryMerger) {
-      // Can hang if mergeThreshold is really low.
-      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-            commitMemory + " > mergeThreshold=" + mergeThreshold + 
-            ". Current usedMemory=" + usedMemory);
-        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-        inMemoryMergedMapOutputs.clear();
-        inMemoryMerger.startMerge(inMemoryMapOutputs);
-      } 
-    }
-    
-    if (memToMemMerger != null) {
-      synchronized (memToMemMerger) {
-        if (!memToMemMerger.isInProgress() && 
-            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
-          memToMemMerger.startMerge(inMemoryMapOutputs);
-        }
-      }
-    }
-  }
-  
-  
-  public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
-    inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
-             ", inMemoryMergedMapOutputs.size() -> " + 
-             inMemoryMergedMapOutputs.size());
-  }
-  
-  public synchronized void closeOnDiskFile(Path file) {
-    onDiskMapOutputs.add(file);
-    
-    synchronized (onDiskMerger) {
-      if (!onDiskMerger.isInProgress() && 
-          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-        onDiskMerger.startMerge(onDiskMapOutputs);
-      }
-    }
-  }
-
-  /**
-   * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
-   * return an invalid state since a merge may not be in progress dur to
-   * inadequate inputs
-   * 
-   * @return true if the merge process is complete, otherwise false
-   */
-  @Private
-  public boolean isMergeComplete() {
-    return finalMergeComplete;
-  }
-  
-  public TezRawKeyValueIterator close() throws Throwable {
-    // Wait for on-going merges to complete
-    if (memToMemMerger != null) { 
-      memToMemMerger.close();
-    }
-    inMemoryMerger.close();
-    onDiskMerger.close();
-    
-    List<MapOutput> memory = 
-      new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
-    memory.addAll(inMemoryMapOutputs);
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
-    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
-    this.finalMergeComplete = true;
-    return kvIter;
-  }
-   
-  void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
-      throws IOException, InterruptedException {
-    combiner.combine(kvIter, writer);
-  }
-
-  private class IntermediateMemoryToMemoryMerger 
-  extends MergeThread<MapOutput> {
-    
-    public IntermediateMemoryToMemoryMerger(MergeManager manager, 
-                                            int mergeFactor) {
-      super(manager, mergeFactor, exceptionReporter);
-      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
-      		    "shuffled map-outputs");
-      setDaemon(true);
-    }
-
-    @Override
-    public void merge(List<MapOutput> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-
-      InputAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
-      List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments, 0);
-      int noInMemorySegments = inMemorySegments.size();
-      
-      MapOutput mergedMapOutputs = 
-        unconditionalReserve(dummyMapId, mergeOutputSize, false);
-      
-      Writer writer = 
-        new InMemoryWriter(mergedMapOutputs.getArrayStream());
-      
-      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
-               " segments of total-size: " + mergeOutputSize);
-
-      TezRawKeyValueIterator rIter = 
-        TezMerger.merge(conf, rfs,
-                       ConfigUtils.getIntermediateInputKeyClass(conf),
-                       ConfigUtils.getIntermediateInputValueClass(conf),
-                       inMemorySegments, inMemorySegments.size(),
-                       new Path(inputContext.getUniqueIdentifier()),
-                       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       nullProgressable, null, null, null);
-      TezMerger.writeFile(rIter, writer, nullProgressable, conf);
-      writer.close();
-
-      LOG.info(inputContext.getUniqueIdentifier() +  
-               " Memory-to-Memory merge of the " + noInMemorySegments +
-               " files in-memory complete.");
-
-      // Note the output of the merge
-      closeInMemoryMergedFile(mergedMapOutputs);
-    }
-  }
-  
-  private class InMemoryMerger extends MergeThread<MapOutput> {
-    
-    public InMemoryMerger(MergeManager manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName
-      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-      
-      //name this output file same as the name of the first file that is 
-      //there in the current list of inmem files (this is guaranteed to
-      //be absent on the disk currently. So we don't overwrite a prev. 
-      //created spill). Also we need to create the output file now since
-      //it is not guaranteed that this file will be present after merge
-      //is called (we delete empty files as soon as we see them
-      //in the merge method)
-
-      //figure out the mapId 
-      InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
-
-      List<Segment> inMemorySegments = new ArrayList<Segment>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments,0);
-      int noInMemorySegments = inMemorySegments.size();
-
-      Path outputPath = mapOutputFile.getInputFileForWrite(
-          srcTaskIdentifier.getInputIdentifier().getSrcTaskIndex(),
-          mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
-
-      Writer writer = null;
-      try {
-        writer =
-            new Writer(conf, rfs, outputPath,
-                (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-                (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                codec, null);
-
-        TezRawKeyValueIterator rIter = null;
-        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-            " segments...");
-
-        rIter = TezMerger.merge(conf, rfs,
-            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-            inMemorySegments, inMemorySegments.size(),
-            new Path(inputContext.getUniqueIdentifier()),
-            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            nullProgressable, spilledRecordsCounter, null, null);
-
-        if (null == combiner) {
-          TezMerger.writeFile(rIter, writer, nullProgressable, conf);
-        } else {
-          runCombineProcessor(rIter, writer);
-        }
-        writer.close();
-        writer = null;
-
-        LOG.info(inputContext.getUniqueIdentifier() +  
-            " Merge of the " + noInMemorySegments +
-            " files in-memory complete." +
-            " Local file is " + outputPath + " of size " + 
-            localFS.getFileStatus(outputPath).getLen());
-      } catch (IOException e) { 
-        //make sure that we delete the ondisk file that we created 
-        //earlier when we invoked cloneFileAttributes
-        localFS.delete(outputPath, true);
-        throw e;
-      } finally {
-        if (writer != null) {
-          writer.close();
-        }
-      }
-
-      // Note the output of the merge
-      closeOnDiskFile(outputPath);
-    }
-
-  }
-  
-  private class OnDiskMerger extends MergeThread<Path> {
-    
-    public OnDiskMerger(MergeManager manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<Path> inputs) throws IOException {
-      // sanity check
-      if (inputs == null || inputs.isEmpty()) {
-        LOG.info("No ondisk files to merge...");
-        return;
-      }
-      
-      long approxOutputSize = 0;
-      int bytesPerSum = 
-        conf.getInt("io.bytes.per.checksum", 512);
-      
-      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
-               " map outputs on disk. Triggering merge...");
-      
-      // 1. Prepare the list of files to be merged. 
-      for (Path file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file).getLen();
-      }
-
-      // add the checksum length
-      approxOutputSize += 
-        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
-      // 2. Start the on-disk merge process
-      Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
-            approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
-      Writer writer = 
-        new Writer(conf, rfs, outputPath, 
-                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
-                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                        codec, null);
-      TezRawKeyValueIterator iter  = null;
-      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
-      try {
-        iter = TezMerger.merge(conf, rfs,
-                            (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
-                            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                            codec, inputs.toArray(new Path[inputs.size()]), 
-                            true, ioSortFactor, tmpDir, 
-                            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
-                            nullProgressable, spilledRecordsCounter, null, 
-                            mergedMapOutputsCounter, null);
-
-        TezMerger.writeFile(iter, writer, nullProgressable, conf);
-        writer.close();
-      } catch (IOException e) {
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      closeOnDiskFile(outputPath);
-
-      LOG.info(inputContext.getUniqueIdentifier() +
-          " Finished merging " + inputs.size() + 
-          " map output files on disk of total-size " + 
-          approxOutputSize + "." + 
-          " Local output file is " + outputPath + " of size " +
-          localFS.getFileStatus(outputPath).getLen());
-    }
-  }
-  
-  private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
-                                      List<Segment> inMemorySegments, 
-                                      long leaveBytes
-                                      ) throws IOException {
-    long totalSize = 0L;
-    // We could use fullSize could come from the RamManager, but files can be
-    // closed but not yet present in inMemoryMapOutputs
-    long fullSize = 0L;
-    for (MapOutput mo : inMemoryMapOutputs) {
-      fullSize += mo.getMemory().length;
-    }
-    while(fullSize > leaveBytes) {
-      MapOutput mo = inMemoryMapOutputs.remove(0);
-      byte[] data = mo.getMemory();
-      long size = data.length;
-      totalSize += size;
-      fullSize -= size;
-      IFile.Reader reader = new InMemoryReader(MergeManager.this, 
-                                                   mo.getAttemptIdentifier(),
-                                                   data, 0, (int)size);
-      inMemorySegments.add(new Segment(reader, true, 
-                                            (mo.isPrimaryMapOutput() ? 
-                                            mergedMapOutputsCounter : null)));
-    }
-    return totalSize;
-  }
-
-  class RawKVIteratorReader extends IFile.Reader {
-
-    private final TezRawKeyValueIterator kvIter;
-
-    public RawKVIteratorReader(TezRawKeyValueIterator kvIter, long size)
-        throws IOException {
-      super(null, null, size, null, spilledRecordsCounter);
-      this.kvIter = kvIter;
-    }
-    public boolean nextRawKey(DataInputBuffer key) throws IOException {
-      if (kvIter.next()) {
-        final DataInputBuffer kb = kvIter.getKey();
-        final int kp = kb.getPosition();
-        final int klen = kb.getLength() - kp;
-        key.reset(kb.getData(), kp, klen);
-        bytesRead += klen;
-        return true;
-      }
-      return false;
-    }
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      final DataInputBuffer vb = kvIter.getValue();
-      final int vp = vb.getPosition();
-      final int vlen = vb.getLength() - vp;
-      value.reset(vb.getData(), vp, vlen);
-      bytesRead += vlen;
-    }
-    public long getPosition() throws IOException {
-      return bytesRead;
-    }
-
-    public void close() throws IOException {
-      kvIter.close();
-    }
-  }
-
-  private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
-                                       List<MapOutput> inMemoryMapOutputs,
-                                       List<Path> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
-    final float maxRedPer =
-      job.getFloat(
-          TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT,
-          TezJobConfig.DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT);
-    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
-    }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    
-
-    // merge config params
-    Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
-    Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
-    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
-    final RawComparator comparator =
-      (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
-
-    // segments required to vacate memory
-    List<Segment> memDiskSegments = new ArrayList<Segment>();
-    long inMemToDiskBytes = 0;
-    boolean mergePhaseFinished = false;
-    if (inMemoryMapOutputs.size() > 0) {
-      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getInputIdentifier().getSrcTaskIndex();
-      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                                memDiskSegments,
-                                                maxInMemReduce);
-      final int numMemDiskSegments = memDiskSegments.size();
-      if (numMemDiskSegments > 0 &&
-            ioSortFactor > onDiskMapOutputs.size()) {
-        
-        // If we reach here, it implies that we have less than io.sort.factor
-        // disk segments and this will be incremented by 1 (result of the 
-        // memory segments merge). Since this total would still be 
-        // <= io.sort.factor, we will not do any more intermediate merges,
-        // the merge of all these disk segments would be directly fed to the
-        // reduce method
-        
-        mergePhaseFinished = true;
-        // must spill to disk, but can't retain in-mem for intermediate merge
-        final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(srcTaskId,
-                                             inMemToDiskBytes).suffix(
-                                                 Constants.MERGED_OUTPUT_PREFIX);
-        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
-            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
-        final Writer writer = new Writer(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
-        try {
-          TezMerger.writeFile(rIter, writer, nullProgressable, job);
-          // add to list of final disk outputs.
-          onDiskMapOutputs.add(outputPath);
-        } catch (IOException e) {
-          if (null != outputPath) {
-            try {
-              fs.delete(outputPath, true);
-            } catch (IOException ie) {
-              // NOTHING
-            }
-          }
-          throw e;
-        } finally {
-          if (null != writer) {
-            writer.close();
-          }
-        }
-        LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit");
-        inMemToDiskBytes = 0;
-        memDiskSegments.clear();
-      } else if (inMemToDiskBytes != 0) {
-        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes in memory for " +
-                 "intermediate, on-disk merge");
-      }
-    }
-
-    // segments on disk
-    List<Segment> diskSegments = new ArrayList<Segment>();
-    long onDiskBytes = inMemToDiskBytes;
-    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
-    for (Path file : onDisk) {
-      onDiskBytes += fs.getFileStatus(file).getLen();
-      LOG.debug("Disk file: " + file + " Length is " + 
-          fs.getFileStatus(file).getLen());
-      diskSegments.add(new Segment(job, fs, file, codec, false,
-                                         (file.toString().endsWith(
-                                             Constants.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter)
-                                        ));
-    }
-    LOG.info("Merging " + onDisk.length + " files, " +
-             onDiskBytes + " bytes from disk");
-    Collections.sort(diskSegments, new Comparator<Segment>() {
-      public int compare(Segment o1, Segment o2) {
-        if (o1.getLength() == o2.getLength()) {
-          return 0;
-        }
-        return o1.getLength() < o2.getLength() ? -1 : 1;
-      }
-    });
-
-    // build final list of segments from merged backed by disk + in-mem
-    List<Segment> finalSegments = new ArrayList<Segment>();
-    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                             finalSegments, 0);
-    LOG.info("Merging " + finalSegments.size() + " segments, " +
-             inMemBytes + " bytes from memory into reduce");
-    if (0 != onDiskBytes) {
-      final int numInMemSegments = memDiskSegments.size();
-      diskSegments.addAll(0, memDiskSegments);
-      memDiskSegments.clear();
-      TezRawKeyValueIterator diskMerge = TezMerger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
-          ioSortFactor, numInMemSegments, tmpDir, comparator,
-          nullProgressable, false, spilledRecordsCounter, null, null);
-      diskSegments.clear();
-      if (0 == finalSegments.size()) {
-        return diskMerge;
-      }
-      finalSegments.add(new Segment(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    }
-    return TezMerger.merge(job, fs, keyClass, valueClass,
-                 finalSegments, finalSegments.size(), tmpDir,
-                 comparator, nullProgressable, spilledRecordsCounter, null,
-                 null);
-  
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
deleted file mode 100644
index bab882e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeThread.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-abstract class MergeThread<T> extends Thread {
-  
-  private static final Log LOG = LogFactory.getLog(MergeThread.class);
-
-  private volatile boolean inProgress = false;
-  private List<T> inputs = new ArrayList<T>();
-  protected final MergeManager manager;
-  private final ExceptionReporter reporter;
-  private boolean closed = false;
-  private final int mergeFactor;
-  
-  public MergeThread(MergeManager manager, int mergeFactor,
-                     ExceptionReporter reporter) {
-    this.manager = manager;
-    this.mergeFactor = mergeFactor;
-    this.reporter = reporter;
-  }
-  
-  public synchronized void close() throws InterruptedException {
-    closed = true;
-    waitForMerge();
-    interrupt();
-  }
-
-  public synchronized boolean isInProgress() {
-    return inProgress;
-  }
-  
-  public synchronized void startMerge(Set<T> inputs) {
-    if (!closed) {
-      inProgress = true;
-      this.inputs = new ArrayList<T>();
-      Iterator<T> iter=inputs.iterator();
-      for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
-        this.inputs.add(iter.next());
-        iter.remove();
-      }
-      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
-               " segments, while ignoring " + inputs.size() + " segments");
-      notifyAll();
-    }
-  }
-
-  public synchronized void waitForMerge() throws InterruptedException {
-    while (inProgress) {
-      wait();
-    }
-  }
-
-  public void run() {
-    while (true) {
-      try {
-        // Wait for notification to start the merge...
-        synchronized (this) {
-          while (!inProgress) {
-            wait();
-          }
-        }
-
-        // Merge
-        merge(inputs);
-      } catch (InterruptedException ie) {
-        return;
-      } catch(Throwable t) {
-        reporter.reportException(t);
-        return;
-      } finally {
-        synchronized (this) {
-          // Clear inputs
-          inputs = null;
-          inProgress = false;        
-          notifyAll();
-        }
-      }
-    }
-  }
-
-  public abstract void merge(List<T> inputs) 
-      throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
deleted file mode 100644
index 15332a1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.common.TezEngineUtils;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.shuffle.common.ShuffleUtils;
-
-import com.google.common.base.Preconditions;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class Shuffle implements ExceptionReporter {
-  
-  private static final Log LOG = LogFactory.getLog(Shuffle.class);
-  private static final int PROGRESS_FREQUENCY = 2000;
-  
-  private final Configuration conf;
-  private final TezInputContext inputContext;
-  private final ShuffleClientMetrics metrics;
-
-  private final ShuffleInputEventHandler eventHandler;
-  private final ShuffleScheduler scheduler;
-  private final MergeManager merger;
-  private Throwable throwable = null;
-  private String throwingThreadName = null;
-  private final int numInputs;
-  private final AtomicInteger reduceStartId;
-  private final SecretKey jobTokenSecret;
-  private AtomicInteger reduceRange = new AtomicInteger(
-      TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-
-  private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
-
-  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
-    this.inputContext = inputContext;
-    this.conf = conf;
-    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
-        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
-        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
-            
-    this.numInputs = numInputs;
-    
-    this.jobTokenSecret = ShuffleUtils
-        .getJobTokenSecretFromTokenBytes(inputContext
-            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
-    
-    Combiner combiner = TezEngineUtils.instantiateCombiner(conf, inputContext);
-    
-    FileSystem localFS = FileSystem.getLocal(this.conf);
-    LocalDirAllocator localDirAllocator = 
-        new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-    // TODO TEZ Get rid of Map / Reduce references.
-    TezCounter shuffledMapsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
-    TezCounter reduceShuffleBytes =
-        inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
-    TezCounter failedShuffleCounter =
-        inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
-    TezCounter spilledRecordsCounter = 
-        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
-    TezCounter reduceCombineInputCounter =
-        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    TezCounter mergedMapOutputsCounter =
-        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
-    
-    reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
-    LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
-        + " with default reduce range: " + reduceRange.get());
-
-    scheduler = new ShuffleScheduler(
-          this.inputContext,
-          this.conf,
-          this.numInputs,
-          this,
-          shuffledMapsCounter,
-          reduceShuffleBytes,
-          failedShuffleCounter);
-    eventHandler= new ShuffleInputEventHandler(
-          inputContext,
-          this,
-          scheduler);
-    merger = new MergeManager(
-          this.conf,
-          localFS,
-          localDirAllocator,
-          inputContext,
-          combiner,
-          spilledRecordsCounter,
-          reduceCombineInputCounter,
-          mergedMapOutputsCounter,
-          this);
-  }
-
-  public void handleEvents(List<Event> events) {
-    eventHandler.handleEvents(events);
-  }
-  
-  /**
-   * Indicates whether the Shuffle and Merge processing is complete.
-   * @return false if not complete, true if complete or if an error occurred.
-   */
-  public boolean isInputReady() {
-    if (runShuffleFuture == null) {
-      return false;
-    }
-    return runShuffleFuture.isDone();
-    //return scheduler.isDone() && merger.isMergeComplete();
-  }
-
-  /**
-   * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
-   * @return an iterator over the fetched input.
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
-    Preconditions.checkState(runShuffleFuture != null,
-        "waitForInput can only be called after run");
-    TezRawKeyValueIterator kvIter;
-    try {
-      kvIter = runShuffleFuture.get();
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof IOException) {
-        throw (IOException) cause;
-      } else if (cause instanceof InterruptedException) {
-        throw (InterruptedException) cause;
-      } else {
-        throw new TezUncheckedException(
-            "Unexpected exception type while running Shuffle and Merge", cause);
-      }
-    }
-    return kvIter;
-  }
-
-  public void run() {
-    RunShuffleCallable runShuffle = new RunShuffleCallable();
-    runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
-    new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
-  }
-  
-  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
-    @Override
-    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
-      // TODO NEWTEZ Limit # fetchers to number of inputs
-      final int numFetchers = 
-          conf.getInt(
-              TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
-              TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-      Fetcher[] fetchers = new Fetcher[numFetchers];
-      for (int i = 0; i < numFetchers; ++i) {
-        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, jobTokenSecret, inputContext);
-        fetchers[i].start();
-      }
-      
-      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-        synchronized (this) {
-          if (throwable != null) {
-            throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                   throwable);
-          }
-        }
-      }
-      
-      // Stop the map-output fetcher threads
-      for (Fetcher fetcher : fetchers) {
-        fetcher.shutDown();
-      }
-      fetchers = null;
-      
-      // stop the scheduler
-      scheduler.close();
-
-
-      // Finish the on-going merges...
-      TezRawKeyValueIterator kvIter = null;
-      try {
-        kvIter = merger.close();
-      } catch (Throwable e) {
-        throw new ShuffleError("Error while doing final merge " , e);
-      }
-      
-      // Sanity check
-      synchronized (Shuffle.this) {
-        if (throwable != null) {
-          throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                 throwable);
-        }
-      }
-      return kvIter;
-    }
-  }
-  
-  public int getReduceStartId() {
-    return reduceStartId.get();
-  }
-  
-  public int getReduceRange() {
-    return reduceRange.get();
-  }
-  
-  public synchronized void reportException(Throwable t) {
-    if (throwable == null) {
-      throwable = t;
-      throwingThreadName = Thread.currentThread().getName();
-      // Notify the scheduler so that the reporting thread finds the 
-      // exception immediately.
-      synchronized (scheduler) {
-        scheduler.notifyAll();
-      }
-    }
-  }
-  
-  public static class ShuffleError extends IOException {
-    private static final long serialVersionUID = 5753909320586607881L;
-
-    ShuffleError(String msg, Throwable t) {
-      super(msg, t);
-    }
-  }
-
-  public void setPartitionRange(int range) {
-    if (range == reduceRange.get()) {
-      return;
-    }
-    if (reduceRange.compareAndSet(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT, range)) {
-      LOG.info("Reduce range set to: " + range);
-    } else {
-      TezUncheckedException e = 
-          new TezUncheckedException("Reduce range can be set only once.");
-      reportException(e);
-      throw e; 
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
deleted file mode 100644
index 850dbeb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics.MetricsContext;
-import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.metrics.Updater;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.TezEngineUtils;
-
-class ShuffleClientMetrics implements Updater {
-
-  private MetricsRecord shuffleMetrics = null;
-  private int numFailedFetches = 0;
-  private int numSuccessFetches = 0;
-  private long numBytes = 0;
-  private int numThreadsBusy = 0;
-  private final int numCopiers;
-  
-  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
-      String user) {
-    this.numCopiers = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-
-    MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ);
-    this.shuffleMetrics = 
-      MetricsUtil.createRecord(metricsContext, "shuffleInput");
-    this.shuffleMetrics.setTag("user", user);
-    this.shuffleMetrics.setTag("dagName", dagName);
-    this.shuffleMetrics.setTag("taskId", TezEngineUtils.getTaskIdentifier(vertexName, taskIndex));
-    this.shuffleMetrics.setTag("sessionId", 
-        conf.get(
-            TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
-    metricsContext.registerUpdater(this);
-  }
-  public synchronized void inputBytes(long numBytes) {
-    this.numBytes += numBytes;
-  }
-  public synchronized void failedFetch() {
-    ++numFailedFetches;
-  }
-  public synchronized void successFetch() {
-    ++numSuccessFetches;
-  }
-  public synchronized void threadBusy() {
-    ++numThreadsBusy;
-  }
-  public synchronized void threadFree() {
-    --numThreadsBusy;
-  }
-  public void doUpdates(MetricsContext unused) {
-    synchronized (this) {
-      shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
-      shuffleMetrics.incrMetric("shuffle_failed_fetches", 
-                                numFailedFetches);
-      shuffleMetrics.incrMetric("shuffle_success_fetches", 
-                                numSuccessFetches);
-      if (numCopiers != 0) {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
-            100*((float)numThreadsBusy/numCopiers));
-      } else {
-        shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
-      }
-      numBytes = 0;
-      numSuccessFetches = 0;
-      numFailedFetches = 0;
-    }
-    shuffleMetrics.update();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
deleted file mode 100644
index a918ef1..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleHeader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-
-/**
- * Shuffle Header information that is sent by the TaskTracker and 
- * deciphered by the Fetcher thread of Reduce task
- *
- */
-@InterfaceAudience.Private
-@InterfaceStability.Stable
-public class ShuffleHeader implements Writable {
-  
-  /** Header info of the shuffle http request/response */
-  public static final String HTTP_HEADER_NAME = "name";
-  public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
-  public static final String HTTP_HEADER_VERSION = "version";
-  public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
-
-  /**
-   * The longest possible length of task attempt id that we will accept.
-   */
-  private static final int MAX_ID_LENGTH = 1000;
-
-  String mapId;
-  long uncompressedLength;
-  long compressedLength;
-  int forReduce;
-  
-  public ShuffleHeader() { }
-  
-  public ShuffleHeader(String mapId, long compressedLength,
-      long uncompressedLength, int forReduce) {
-    this.mapId = mapId;
-    this.compressedLength = compressedLength;
-    this.uncompressedLength = uncompressedLength;
-    this.forReduce = forReduce;
-  }
-  
-  public String getMapId() {
-    return this.mapId;
-  }
-  
-  public int getPartition() {
-    return this.forReduce;
-  }
-  
-  public long getUncompressedLength() {
-    return uncompressedLength;
-  }
-
-  public long getCompressedLength() {
-    return compressedLength;
-  }
-
-  public void readFields(DataInput in) throws IOException {
-    mapId = WritableUtils.readStringSafely(in, MAX_ID_LENGTH);
-    compressedLength = WritableUtils.readVLong(in);
-    uncompressedLength = WritableUtils.readVLong(in);
-    forReduce = WritableUtils.readVInt(in);
-  }
-
-  public void write(DataOutput out) throws IOException {
-    Text.writeString(out, mapId);
-    WritableUtils.writeVLong(out, compressedLength);
-    WritableUtils.writeVLong(out, uncompressedLength);
-    WritableUtils.writeVInt(out, forReduce);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
deleted file mode 100644
index a8e5fe4..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.net.URI;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.DataMovementEvent;
-import org.apache.tez.engine.api.events.InputFailedEvent;
-import org.apache.tez.engine.api.events.InputInformationEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
-import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class ShuffleInputEventHandler {
-  
-  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
-
-  private final ShuffleScheduler scheduler;
-  private final TezInputContext inputContext;
-  private final Shuffle shuffle;
-
-  private int maxMapRuntime = 0;
-  private boolean shuffleRangeSet = false;
-  
-  public ShuffleInputEventHandler(TezInputContext inputContext,
-      Shuffle shuffle, ShuffleScheduler scheduler) {
-    this.inputContext = inputContext;
-    this.shuffle = shuffle;
-    this.scheduler = scheduler;
-  }
-
-  public void handleEvents(List<Event> events) {
-    for (Event event : events) {
-      handleEvent(event);
-    }
-  }
-  
-  
-  private void handleEvent(Event event) {
-    if (event instanceof InputInformationEvent) {
-      processInputInformationEvent((InputInformationEvent) event);
-    }
-    else if (event instanceof DataMovementEvent) {
-      processDataMovementEvent((DataMovementEvent) event);      
-    } else if (event instanceof InputFailedEvent) {
-      processTaskFailedEvent((InputFailedEvent) event);
-    }
-  }
-
-  private void processInputInformationEvent(InputInformationEvent iiEvent) {
-    InputInformationEventPayloadProto inputInfoPayload;
-    try {
-      inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
-    }
-    int partitionRange = inputInfoPayload.getPartitionRange();
-    shuffle.setPartitionRange(partitionRange);
-    this.shuffleRangeSet = true;
-  }
-
-  private void processDataMovementEvent(DataMovementEvent dmEvent) {
-    // FIXME TODO NEWTEZ
-    // Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
-    DataMovementEventPayloadProto shufflePayload;
-    try {
-      shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
-    } catch (InvalidProtocolBufferException e) {
-      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
-    } 
-    int partitionId = dmEvent.getSourceIndex();
-    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-
-    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
-    
-    // TODO NEWTEZ See if this duration hack can be removed.
-    int duration = shufflePayload.getRunDuration();
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-      scheduler.informMaxMapRunTime(maxMapRuntime);
-    }
-  }
-  
-  private void processTaskFailedEvent(InputFailedEvent ifEvent) {
-    InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
-    scheduler.obsoleteMapOutput(taIdentifier);
-    LOG.info("Obsoleting output of src-task: " + taIdentifier);
-  }
-
-  // TODO NEWTEZ Handle encrypted shuffle
-  private URI getBaseURI(String host, int port, int partitionId) {
-    StringBuilder sb = new StringBuilder("http://");
-    sb.append(host);
-    sb.append(":");
-    sb.append(String.valueOf(port));
-    sb.append("/");
-    
-    sb.append("mapOutput?job=");
-    // Required to use the existing ShuffleHandler
-    sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
-    
-    sb.append("&reduce=");
-    sb.append(partitionId);
-    sb.append("&map=");
-    URI u = URI.create(sb.toString());
-    return u;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
deleted file mode 100644
index be75668..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.events.InputReadErrorEvent;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.TezEngineUtils;
-
-import com.google.common.collect.Lists;
-
-class ShuffleScheduler {
-  static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
-    protected Long initialValue() {
-      return 0L;
-    }
-  };
-
-  private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
-  private static final int MAX_MAPS_AT_ONCE = 20;
-  private static final long INITIAL_PENALTY = 10000;
-  private static final float PENALTY_GROWTH_RATE = 1.3f;
-  
-  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
-  private final Map<Integer, MutableInt> finishedMaps;
-  private final int numInputs;
-  private int remainingMaps;
-  private Map<InputAttemptIdentifier, MapHost> mapLocations = new HashMap<InputAttemptIdentifier, MapHost>();
-  //TODO NEWTEZ Clean this and other maps at some point
-  private ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, InputAttemptIdentifier>(); 
-  private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<InputAttemptIdentifier> obsoleteMaps = new HashSet<InputAttemptIdentifier>();
-  
-  private final Random random = new Random(System.currentTimeMillis());
-  private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
-  private final Referee referee = new Referee();
-  private final Map<InputAttemptIdentifier, IntWritable> failureCounts =
-    new HashMap<InputAttemptIdentifier,IntWritable>(); 
-  private final Map<String,IntWritable> hostFailures = 
-    new HashMap<String,IntWritable>();
-  private final TezInputContext inputContext;
-  private final Shuffle shuffle;
-  private final int abortFailureLimit;
-  private final TezCounter shuffledMapsCounter;
-  private final TezCounter reduceShuffleBytes;
-  private final TezCounter failedShuffleCounter;
-  
-  private final long startTime;
-  private long lastProgressTime;
-  
-  private int maxMapRuntime = 0;
-  private int maxFailedUniqueFetches = 5;
-  private int maxFetchFailuresBeforeReporting;
-  
-  private long totalBytesShuffledTillNow = 0;
-  private DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
-
-  private boolean reportReadErrorImmediately = true;
-  
-  public ShuffleScheduler(TezInputContext inputContext,
-                          Configuration conf,
-                          int tasksInDegree,
-                          Shuffle shuffle,
-                          TezCounter shuffledMapsCounter,
-                          TezCounter reduceShuffleBytes,
-                          TezCounter failedShuffleCounter) {
-    this.inputContext = inputContext;
-    this.numInputs = tasksInDegree;
-    abortFailureLimit = Math.max(30, tasksInDegree / 10);
-    remainingMaps = tasksInDegree;
-  //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
-    finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
-    this.shuffle = shuffle;
-    this.shuffledMapsCounter = shuffledMapsCounter;
-    this.reduceShuffleBytes = reduceShuffleBytes;
-    this.failedShuffleCounter = failedShuffleCounter;
-    this.startTime = System.currentTimeMillis();
-    this.lastProgressTime = startTime;
-    referee.start();
-    this.maxFailedUniqueFetches = Math.min(tasksInDegree,
-        this.maxFailedUniqueFetches);
-    this.maxFetchFailuresBeforeReporting = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_FETCH_FAILURES_LIMIT);
-    this.reportReadErrorImmediately = 
-        conf.getBoolean(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
-  }
-
-  public synchronized void copySucceeded(InputAttemptIdentifier srcAttemptIdentifier, 
-                                         MapHost host,
-                                         long bytes,
-                                         long milis,
-                                         MapOutput output
-                                         ) throws IOException {
-    String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
-    failureCounts.remove(taskIdentifier);
-    hostFailures.remove(host.getHostName());
-    
-    if (!isFinishedTaskTrue(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
-      output.commit();
-      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex())) {
-        shuffledMapsCounter.increment(1);
-        if (--remainingMaps == 0) {
-          notifyAll();
-        }
-      }
-
-      // update the status
-      lastProgressTime = System.currentTimeMillis();
-      totalBytesShuffledTillNow += bytes;
-      logProgress();
-      reduceShuffleBytes.increment(bytes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("src task: "
-            + TezEngineUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttemptIdentifier.getInputIdentifier().getSrcTaskIndex(),
-                srcAttemptIdentifier.getAttemptNumber()) + " done");
-      }
-    }
-    // TODO NEWTEZ Should this be releasing the output, if not committed ? Possible memory leak in case of speculation.
-  }
-
-  private void logProgress() {
-    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-    int mapsDone = numInputs - remainingMaps;
-    long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
-    float transferRate = mbs / secsSinceStart;
-    LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
-        + mbpsFormat.format(transferRate) + " MB/s)");
-  }
-
-  public synchronized void copyFailed(InputAttemptIdentifier srcAttempt,
-                                      MapHost host,
-                                      boolean readError) {
-    host.penalize();
-    int failures = 1;
-    if (failureCounts.containsKey(srcAttempt)) {
-      IntWritable x = failureCounts.get(srcAttempt);
-      x.set(x.get() + 1);
-      failures = x.get();
-    } else {
-      failureCounts.put(srcAttempt, new IntWritable(1));      
-    }
-    String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
-    if (failures >= abortFailureLimit) {
-      try {
-        throw new IOException(failures
-            + " failures downloading "
-            + TezEngineUtils.getTaskAttemptIdentifier(
-                inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
-                srcAttempt.getAttemptNumber()));
-      } catch (IOException ie) {
-        shuffle.reportException(ie);
-      }
-    }
-    
-    checkAndInformJobTracker(failures, srcAttempt, readError);
-
-    checkReducerHealth();
-    
-    long delay = (long) (INITIAL_PENALTY *
-        Math.pow(PENALTY_GROWTH_RATE, failures));
-    
-    penalties.add(new Penalty(host, delay));
-    
-    failedShuffleCounter.increment(1);
-  }
-  
-  // Notify the JobTracker  
-  // after every read error, if 'reportReadErrorImmediately' is true or
-  // after every 'maxFetchFailuresBeforeReporting' failures
-  private void checkAndInformJobTracker(
-      int failures, InputAttemptIdentifier srcAttempt, boolean readError) {
-    if ((reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for "
-          + TezEngineUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
-              srcAttempt.getAttemptNumber()) + " to jobtracker.");
-
-      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
-      failedEvents.add(new InputReadErrorEvent("Fetch failure for "
-          + TezEngineUtils.getTaskAttemptIdentifier(
-              inputContext.getSourceVertexName(), srcAttempt.getInputIdentifier().getSrcTaskIndex(),
-              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt.getInputIdentifier()
-          .getSrcTaskIndex(), srcAttempt.getAttemptNumber()));
-
-      inputContext.sendEvents(failedEvents);      
-      //status.addFailedDependency(mapId);
-    }
-  }
-    
-  private void checkReducerHealth() {
-    final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
-    final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
-    final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-
-    long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = numInputs - remainingMaps;
-    
-    boolean reducerHealthy =
-      (((float)totalFailures / (totalFailures + doneMaps))
-          < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-    
-    // check if the reducer has progressed enough
-    boolean reducerProgressedEnough =
-      (((float)doneMaps / numInputs)
-          >= MIN_REQUIRED_PROGRESS_PERCENT);
-
-    // check if the reducer is stalled for a long time
-    // duration for which the reducer is stalled
-    int stallDuration =
-      (int)(System.currentTimeMillis() - lastProgressTime);
-    
-    // duration for which the reducer ran with progress
-    int shuffleProgressDuration =
-      (int)(lastProgressTime - startTime);
-
-    // min time the reducer should run without getting killed
-    int minShuffleRunDuration =
-      (shuffleProgressDuration > maxMapRuntime)
-      ? shuffleProgressDuration
-          : maxMapRuntime;
-    
-    boolean reducerStalled =
-      (((float)stallDuration / minShuffleRunDuration)
-          >= MAX_ALLOWED_STALL_TIME_PERCENT);
-
-    // kill if not healthy and has insufficient progress
-    if ((failureCounts.size() >= maxFailedUniqueFetches ||
-        failureCounts.size() == (numInputs - doneMaps))
-        && !reducerHealthy
-        && (!reducerProgressedEnough || reducerStalled)) {
-      LOG.fatal("Shuffle failed with too many fetch failures " +
-      "and insufficient progress!");
-      String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
-      shuffle.reportException(new IOException(errorMsg));
-    }
-
-  }
-  
-  public synchronized void tipFailed(int srcTaskIndex) {
-    if (!isFinishedTaskTrue(srcTaskIndex)) {
-      setFinishedTaskTrue(srcTaskIndex);
-      if (--remainingMaps == 0) {
-        notifyAll();
-      }
-      logProgress();
-    }
-  }
-  
-  public synchronized void addKnownMapOutput(String hostName,
-                                             int partitionId,
-                                             String hostUrl,
-                                             InputAttemptIdentifier srcAttempt) {
-    String identifier = MapHost.createIdentifier(hostName, partitionId);
-    MapHost host = mapLocations.get(identifier);
-    if (host == null) {
-      host = new MapHost(partitionId, hostName, hostUrl);
-      assert identifier.equals(host.getIdentifier());
-      mapLocations.put(srcAttempt, host);
-    }
-    host.addKnownMap(srcAttempt);
-    pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
-
-    // Mark the host as pending
-    if (host.getState() == MapHost.State.PENDING) {
-      pendingHosts.add(host);
-      notifyAll();
-    }
-  }
-  
-  public synchronized void obsoleteMapOutput(InputAttemptIdentifier srcAttempt) {
-    // The incoming srcAttempt does not contain a path component.
-    obsoleteMaps.add(srcAttempt);
-  }
-  
-  public synchronized void putBackKnownMapOutput(MapHost host,
-                                                 InputAttemptIdentifier srcAttempt) {
-    host.addKnownMap(srcAttempt);
-  }
-
-  public synchronized MapHost getHost() throws InterruptedException {
-      while(pendingHosts.isEmpty()) {
-        wait();
-      }
-      
-      MapHost host = null;
-      Iterator<MapHost> iter = pendingHosts.iterator();
-      int numToPick = random.nextInt(pendingHosts.size());
-      for (int i=0; i <= numToPick; ++i) {
-        host = iter.next();
-      }
-      
-      pendingHosts.remove(host);     
-      host.markBusy();
-      
-      LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() + 
-               " to " + Thread.currentThread().getName());
-      shuffleStart.set(System.currentTimeMillis());
-      
-      return host;
-  }
-  
-  public InputAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
-    return pathToIdentifierMap.get(pathComponent);
-  }
-  
-  public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {
-    List<InputAttemptIdentifier> list = host.getAndClearKnownMaps();
-    Iterator<InputAttemptIdentifier> itr = list.iterator();
-    List<InputAttemptIdentifier> result = new ArrayList<InputAttemptIdentifier>();
-    int includedMaps = 0;
-    int totalSize = list.size();
-    // find the maps that we still need, up to the limit
-    while (itr.hasNext()) {
-      InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
-        result.add(id);
-        if (++includedMaps >= MAX_MAPS_AT_ONCE) {
-          break;
-        }
-      }
-    }
-    // put back the maps left after the limit
-    while (itr.hasNext()) {
-      InputAttemptIdentifier id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getInputIdentifier().getSrcTaskIndex())) {
-        host.addKnownMap(id);
-      }
-    }
-    LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
-             host + " to " + Thread.currentThread().getName());
-    return result;
-  }
-
-  public synchronized void freeHost(MapHost host) {
-    if (host.getState() != MapHost.State.PENALIZED) {
-      if (host.markAvailable() == MapHost.State.PENDING) {
-        pendingHosts.add(host);
-        notifyAll();
-      }
-    }
-    LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " + 
-             (System.currentTimeMillis()-shuffleStart.get()) + "s");
-  }
-    
-  public synchronized void resetKnownMaps() {
-    mapLocations.clear();
-    obsoleteMaps.clear();
-    pendingHosts.clear();
-    pathToIdentifierMap.clear();
-  }
-
-  /**
-   * Utility method to check if the Shuffle data fetch is complete.
-   * @return
-   */
-  public synchronized boolean isDone() {
-    return remainingMaps == 0;
-  }
-
-  /**
-   * Wait until the shuffle finishes or until the timeout.
-   * @param millis maximum wait time
-   * @return true if the shuffle is done
-   * @throws InterruptedException
-   */
-  public synchronized boolean waitUntilDone(int millis
-                                            ) throws InterruptedException {
-    if (remainingMaps > 0) {
-      wait(millis);
-      return remainingMaps == 0;
-    }
-    return true;
-  }
-  
-  /**
-   * A structure that records the penalty for a host.
-   */
-  private static class Penalty implements Delayed {
-    MapHost host;
-    private long endTime;
-    
-    Penalty(MapHost host, long delay) {
-      this.host = host;
-      this.endTime = System.currentTimeMillis() + delay;
-    }
-
-    public long getDelay(TimeUnit unit) {
-      long remainingTime = endTime - System.currentTimeMillis();
-      return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
-    }
-
-    public int compareTo(Delayed o) {
-      long other = ((Penalty) o).endTime;
-      return endTime == other ? 0 : (endTime < other ? -1 : 1);
-    }
-    
-  }
-  
-  /**
-   * A thread that takes hosts off of the penalty list when the timer expires.
-   */
-  private class Referee extends Thread {
-    public Referee() {
-      setName("ShufflePenaltyReferee");
-      setDaemon(true);
-    }
-
-    public void run() {
-      try {
-        while (true) {
-          // take the first host that has an expired penalty
-          MapHost host = penalties.take().host;
-          synchronized (ShuffleScheduler.this) {
-            if (host.markAvailable() == MapHost.State.PENDING) {
-              pendingHosts.add(host);
-              ShuffleScheduler.this.notifyAll();
-            }
-          }
-        }
-      } catch (InterruptedException ie) {
-        return;
-      } catch (Throwable t) {
-        shuffle.reportException(t);
-      }
-    }
-  }
-  
-  public void close() throws InterruptedException {
-    referee.interrupt();
-    referee.join();
-  }
-
-  public synchronized void informMaxMapRunTime(int duration) {
-    if (duration > maxMapRuntime) {
-      maxMapRuntime = duration;
-    }
-  }
-  
-  void setFinishedTaskTrue(int srcTaskIndex) {
-    synchronized(finishedMaps) {
-      finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
-    }
-  }
-  
-  boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
-    synchronized(finishedMaps) {
-      MutableInt result = finishedMaps.get(srcTaskIndex);
-      if(result == null) {
-        result = new MutableInt(0);
-        finishedMaps.put(srcTaskIndex, result);
-      }
-      result.increment();
-      return isFinishedTaskTrue(srcTaskIndex);
-    }
-  }
-  
-  boolean isFinishedTaskTrue(int srcTaskIndex) {
-    synchronized (finishedMaps) {
-      MutableInt result = finishedMaps.get(srcTaskIndex);
-      if(result == null) {
-        return false;
-      }
-      if (result.intValue() == shuffle.getReduceRange()) {
-        return true;
-      }
-      return false;      
-    }
-  }
-}