You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/01/24 03:45:59 UTC

svn commit: r1437843 [2/3] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-clien...

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Thu Jan 24 02:45:45 2013
@@ -17,119 +17,36 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import java.io.InputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
+
+import org.apache.hadoop.mapred.Reporter;
+
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
-public class MapOutput<K,V> {
-  private static final Log LOG = LogFactory.getLog(MapOutput.class);
+public abstract class MapOutput<K, V> {
   private static AtomicInteger ID = new AtomicInteger(0);
   
-  public static enum Type {
-    WAIT,
-    MEMORY,
-    DISK
-  }
-  
   private final int id;
-  
-  private final MergeManager<K,V> merger;
   private final TaskAttemptID mapId;
-  
   private final long size;
-  
-  private final byte[] memory;
-  private BoundedByteArrayOutputStream byteStream;
-  
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final Path outputPath;
-  private final OutputStream disk; 
-  
-  private final Type type;
-  
   private final boolean primaryMapOutput;
   
-  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
-            JobConf conf, LocalDirAllocator localDirAllocator,
-            int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
-         throws IOException {
+  public MapOutput(TaskAttemptID mapId, long size, boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
     this.mapId = mapId;
-    this.merger = merger;
-
-    type = Type.DISK;
-
-    memory = null;
-    byteStream = null;
-
     this.size = size;
-    
-    this.localFS = FileSystem.getLocal(conf);
-    outputPath =
-      mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
-    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
-    disk = localFS.create(tmpOutputPath);
-    
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
-            boolean primaryMapOutput) {
-    this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    this.merger = merger;
-
-    type = Type.MEMORY;
-    byteStream = new BoundedByteArrayOutputStream(size);
-    memory = byteStream.getBuffer();
-
-    this.size = size;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-    
-    this.primaryMapOutput = primaryMapOutput;
-  }
-
-  public MapOutput(TaskAttemptID mapId) {
-    this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    
-    type = Type.WAIT;
-    merger = null;
-    memory = null;
-    byteStream = null;
-    
-    size = -1;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-
-    this.primaryMapOutput = false;
-}
-  
   public boolean isPrimaryMapOutput() {
     return primaryMapOutput;
   }
@@ -147,62 +64,28 @@ public class MapOutput<K,V> {
     return id;
   }
 
-  public Path getOutputPath() {
-    return outputPath;
-  }
-
-  public byte[] getMemory() {
-    return memory;
-  }
-
-  public BoundedByteArrayOutputStream getArrayStream() {
-    return byteStream;
-  }
-  
-  public OutputStream getDisk() {
-    return disk;
-  }
-
   public TaskAttemptID getMapId() {
     return mapId;
   }
 
-  public Type getType() {
-    return type;
-  }
-
   public long getSize() {
     return size;
   }
 
-  public void commit() throws IOException {
-    if (type == Type.MEMORY) {
-      merger.closeInMemoryFile(this);
-    } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath);
-      merger.closeOnDiskFile(outputPath);
-    } else {
-      throw new IOException("Cannot commit MapOutput of type WAIT!");
-    }
-  }
-  
-  public void abort() {
-    if (type == Type.MEMORY) {
-      merger.unreserve(memory.length);
-    } else if (type == Type.DISK) {
-      try {
-        localFS.delete(tmpOutputPath, false);
-      } catch (IOException ie) {
-        LOG.info("failure to clean up " + tmpOutputPath, ie);
-      }
-    } else {
-      throw new IllegalArgumentException
-                   ("Cannot commit MapOutput with of type WAIT!");
-    }
-  }
+  public abstract void shuffle(MapHost host, InputStream input,
+                               long compressedLength,
+                               long decompressedLength,
+                               ShuffleClientMetrics metrics,
+                               Reporter reporter) throws IOException;
+
+  public abstract void commit() throws IOException;
   
+  public abstract void abort();
+
+  public abstract String getDescription();
+
   public String toString() {
-    return "MapOutput(" + mapId + ", " + type + ")";
+    return "MapOutput(" + mapId + ", " + getDescription() + ")";
   }
   
   public static class MapOutputComparator<K, V> 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Thu Jan 24 02:45:45 2013
@@ -15,783 +15,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapreduce.task.reduce;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
+package org.apache.hadoop.mapreduce.task.reduce;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.IFile;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapred.Merger;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.IFile.Reader;
-import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
-import org.apache.hadoop.mapred.Task.CombineValuesIterator;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
 import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 
-@SuppressWarnings(value={"unchecked"})
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
+/**
+ * An interface for a reduce side merge that works with the default Shuffle
+ * implementation.
+ */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MergeManager<K, V> {
-  
-  private static final Log LOG = LogFactory.getLog(MergeManager.class);
-  
-  /* Maximum percentage of the in-memory limit that a single shuffle can 
-   * consume*/ 
-  private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
-    = 0.25f;
-
-  private final TaskAttemptID reduceId;
-  
-  private final JobConf jobConf;
-  private final FileSystem localFS;
-  private final FileSystem rfs;
-  private final LocalDirAllocator localDirAllocator;
-  
-  protected MapOutputFile mapOutputFile;
-  
-  Set<MapOutput<K, V>> inMemoryMergedMapOutputs = 
-    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
-  Set<MapOutput<K, V>> inMemoryMapOutputs = 
-    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
-  
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
-  private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
-  private long usedMemory;
-  private long commitMemory;
-  private final long maxSingleShuffleLimit;
-  
-  private final int memToMemMergeOutputsThreshold; 
-  private final long mergeThreshold;
-  
-  private final int ioSortFactor;
-
-  private final Reporter reporter;
-  private final ExceptionReporter exceptionReporter;
-  
+public interface MergeManager<K, V> {
   /**
-   * Combiner class to run during in-memory merge, if defined.
+   * To wait until merge has some freed resources available so that it can
+   * accept shuffled data.  This will be called before a network connection is
+   * established to get the map output.
    */
-  private final Class<? extends Reducer> combinerClass;
+  public void waitForResource() throws InterruptedException;
 
   /**
-   * Resettable collector used for combine.
+   * To reserve resources for data to be shuffled.  This will be called after
+   * a network connection is made to shuffle the data.
+   * @param mapId mapper from which data will be shuffled.
+   * @param requestedSize size in bytes of data that will be shuffled.
+   * @param fetcher id of the map output fetcher that will shuffle the data.
+   * @return a MapOutput object that can be used by shuffle to shuffle data.  If
+   * required resources cannot be reserved immediately, a null can be returned.
    */
-  private final CombineOutputCollector<K,V> combineCollector;
-
-  private final Counters.Counter spilledRecordsCounter;
-
-  private final Counters.Counter reduceCombineInputCounter;
-
-  private final Counters.Counter mergedMapOutputsCounter;
-  
-  private final CompressionCodec codec;
-  
-  private final Progress mergePhase;
-
-  public MergeManager(TaskAttemptID reduceId, JobConf jobConf, 
-                      FileSystem localFS,
-                      LocalDirAllocator localDirAllocator,  
-                      Reporter reporter,
-                      CompressionCodec codec,
-                      Class<? extends Reducer> combinerClass,
-                      CombineOutputCollector<K,V> combineCollector,
-                      Counters.Counter spilledRecordsCounter,
-                      Counters.Counter reduceCombineInputCounter,
-                      Counters.Counter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter,
-                      Progress mergePhase, MapOutputFile mapOutputFile) {
-    this.reduceId = reduceId;
-    this.jobConf = jobConf;
-    this.localDirAllocator = localDirAllocator;
-    this.exceptionReporter = exceptionReporter;
-    
-    this.reporter = reporter;
-    this.codec = codec;
-    this.combinerClass = combinerClass;
-    this.combineCollector = combineCollector;
-    this.reduceCombineInputCounter = reduceCombineInputCounter;
-    this.spilledRecordsCounter = spilledRecordsCounter;
-    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = mapOutputFile;
-    this.mapOutputFile.setConf(jobConf);
-    
-    this.localFS = localFS;
-    this.rfs = ((LocalFileSystem)localFS).getRaw();
-    
-    final float maxInMemCopyUse =
-      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for " +
-          MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
-          maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
-    this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
+  public MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize,
+                                 int fetcher) throws IOException;
 
-    final float singleShuffleMemoryLimitPercent =
-        jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
-            DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    usedMemory = 0L;
-    commitMemory = 0L;
-    this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
-    this.memToMemMergeOutputsThreshold = 
-            jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
-    this.mergeThreshold = (long)(this.memoryLimit * 
-                          jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 
-                                           0.90f));
-    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
-             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
-             "mergeThreshold=" + mergeThreshold + ", " + 
-             "ioSortFactor=" + ioSortFactor + ", " +
-             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
-    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
-          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
-          + "mergeThreshold: " + this.mergeThreshold);
-    }
-
-    boolean allowMemToMemMerge = 
-      jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
-    if (allowMemToMemMerge) {
-      this.memToMemMerger = 
-        new IntermediateMemoryToMemoryMerger(this,
-                                             memToMemMergeOutputsThreshold);
-      this.memToMemMerger.start();
-    } else {
-      this.memToMemMerger = null;
-    }
-    
-    this.inMemoryMerger = createInMemoryMerger();
-    this.inMemoryMerger.start();
-    
-    this.onDiskMerger = new OnDiskMerger(this);
-    this.onDiskMerger.start();
-    
-    this.mergePhase = mergePhase;
-  }
-  
-  protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
-    return new InMemoryMerger(this);
-  }
-
-  TaskAttemptID getReduceId() {
-    return reduceId;
-  }
-
-  @VisibleForTesting
-  ExceptionReporter getExceptionReporter() {
-    return exceptionReporter;
-  }
-
-  public void waitForInMemoryMerge() throws InterruptedException {
-    inMemoryMerger.waitForMerge();
-  }
-  
-  private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit); 
-  }
-  
-  final private MapOutput<K,V> stallShuffle = new MapOutput<K,V>(null);
-
-  public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
-                                             long requestedSize,
-                                             int fetcher
-                                             ) throws IOException {
-    if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
-               " is greater than maxSingleShuffleLimit (" + 
-               maxSingleShuffleLimit + ")");
-      return new MapOutput<K,V>(mapId, this, requestedSize, jobConf, 
-                                localDirAllocator, fetcher, true,
-                                mapOutputFile);
-    }
-    
-    // Stall shuffle if we are above the memory limit
-
-    // It is possible that all threads could just be stalling and not make
-    // progress at all. This could happen when:
-    //
-    // requested size is causing the used memory to go above limit &&
-    // requested size < singleShuffleLimit &&
-    // current used size < mergeThreshold (merge will not get triggered)
-    //
-    // To avoid this from happening, we allow exactly one thread to go past
-    // the memory limit. We check (usedMemory > memoryLimit) and not
-    // (usedMemory + requestedSize > memoryLimit). When this thread is done
-    // fetching, this will automatically trigger a merge thereby unlocking
-    // all the stalled threads
-    
-    if (usedMemory > memoryLimit) {
-      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
-          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
-          " CommitMemory is (" + commitMemory + ")"); 
-      return stallShuffle;
-    }
-    
-    // Allow the in-memory shuffle to progress
-    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
-        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
-        + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(mapId, requestedSize, true);
-  }
-  
   /**
-   * Unconditional Reserve is used by the Memory-to-Memory thread
-   * @return
+   * Called at the end of shuffle.
+   * @return a key value iterator object.
    */
-  private synchronized MapOutput<K, V> unconditionalReserve(
-      TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
-    usedMemory += requestedSize;
-    return new MapOutput<K,V>(mapId, this, (int)requestedSize, 
-        primaryMapOutput);
-  }
-  
-  synchronized void unreserve(long size) {
-    usedMemory -= size;
-  }
-
-  public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) { 
-    inMemoryMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
-    commitMemory+= mapOutput.getSize();
-
-    // Can hang if mergeThreshold is really low.
-    if (commitMemory >= mergeThreshold) {
-      LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-          commitMemory + " > mergeThreshold=" + mergeThreshold + 
-          ". Current usedMemory=" + usedMemory);
-      inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-      inMemoryMergedMapOutputs.clear();
-      inMemoryMerger.startMerge(inMemoryMapOutputs);
-      commitMemory = 0L;  // Reset commitMemory.
-    }
-    
-    if (memToMemMerger != null) {
-      if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { 
-        memToMemMerger.startMerge(inMemoryMapOutputs);
-      }
-    }
-  }
-  
-  
-  public synchronized void closeInMemoryMergedFile(MapOutput<K,V> mapOutput) {
-    inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
-             ", inMemoryMergedMapOutputs.size() -> " + 
-             inMemoryMergedMapOutputs.size());
-  }
-  
-  public synchronized void closeOnDiskFile(Path file) {
-    onDiskMapOutputs.add(file);
-    
-    if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-      onDiskMerger.startMerge(onDiskMapOutputs);
-    }
-  }
-  
-  public RawKeyValueIterator close() throws Throwable {
-    // Wait for on-going merges to complete
-    if (memToMemMerger != null) { 
-      memToMemMerger.close();
-    }
-    inMemoryMerger.close();
-    onDiskMerger.close();
-    
-    List<MapOutput<K, V>> memory = 
-      new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
-    memory.addAll(inMemoryMapOutputs);
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
-    return finalMerge(jobConf, rfs, memory, disk);
-  }
-   
-  private class IntermediateMemoryToMemoryMerger 
-  extends MergeThread<MapOutput<K, V>, K, V> {
-    
-    public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager, 
-                                            int mergeFactor) {
-      super(manager, mergeFactor, exceptionReporter);
-      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
-      		    "shuffled map-outputs");
-      setDaemon(true);
-    }
-
-    @Override
-    public void merge(List<MapOutput<K, V>> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-
-      TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
-      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments, 0);
-      int noInMemorySegments = inMemorySegments.size();
-      
-      MapOutput<K, V> mergedMapOutputs = 
-        unconditionalReserve(dummyMapId, mergeOutputSize, false);
-      
-      Writer<K, V> writer = 
-        new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
-      
-      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
-               " segments of total-size: " + mergeOutputSize);
-
-      RawKeyValueIterator rIter = 
-        Merger.merge(jobConf, rfs,
-                     (Class<K>)jobConf.getMapOutputKeyClass(),
-                     (Class<V>)jobConf.getMapOutputValueClass(),
-                     inMemorySegments, inMemorySegments.size(),
-                     new Path(reduceId.toString()),
-                     (RawComparator<K>)jobConf.getOutputKeyComparator(),
-                     reporter, null, null, null);
-      Merger.writeFile(rIter, writer, reporter, jobConf);
-      writer.close();
-
-      LOG.info(reduceId +  
-               " Memory-to-Memory merge of the " + noInMemorySegments +
-               " files in-memory complete.");
-
-      // Note the output of the merge
-      closeInMemoryMergedFile(mergedMapOutputs);
-    }
-  }
-  
-  private class InMemoryMerger extends MergeThread<MapOutput<K,V>, K,V> {
-    
-    public InMemoryMerger(MergeManager<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName
-      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<MapOutput<K,V>> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-      
-      //name this output file same as the name of the first file that is 
-      //there in the current list of inmem files (this is guaranteed to
-      //be absent on the disk currently. So we don't overwrite a prev. 
-      //created spill). Also we need to create the output file now since
-      //it is not guaranteed that this file will be present after merge
-      //is called (we delete empty files as soon as we see them
-      //in the merge method)
-
-      //figure out the mapId 
-      TaskAttemptID mapId = inputs.get(0).getMapId();
-      TaskID mapTaskId = mapId.getTaskID();
-
-      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments,0);
-      int noInMemorySegments = inMemorySegments.size();
-
-      Path outputPath = 
-        mapOutputFile.getInputFileForWrite(mapTaskId,
-                                           mergeOutputSize).suffix(
-                                               Task.MERGED_OUTPUT_PREFIX);
-
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath,
-                        (Class<K>) jobConf.getMapOutputKeyClass(),
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
-
-      RawKeyValueIterator rIter = null;
-      try {
-        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-                 " segments...");
-        
-        rIter = Merger.merge(jobConf, rfs,
-                             (Class<K>)jobConf.getMapOutputKeyClass(),
-                             (Class<V>)jobConf.getMapOutputValueClass(),
-                             inMemorySegments, inMemorySegments.size(),
-                             new Path(reduceId.toString()),
-                             (RawComparator<K>)jobConf.getOutputKeyComparator(),
-                             reporter, spilledRecordsCounter, null, null);
-        
-        if (null == combinerClass) {
-          Merger.writeFile(rIter, writer, reporter, jobConf);
-        } else {
-          combineCollector.setWriter(writer);
-          combineAndSpill(rIter, reduceCombineInputCounter);
-        }
-        writer.close();
-
-        LOG.info(reduceId +  
-            " Merge of the " + noInMemorySegments +
-            " files in-memory complete." +
-            " Local file is " + outputPath + " of size " + 
-            localFS.getFileStatus(outputPath).getLen());
-      } catch (IOException e) { 
-        //make sure that we delete the ondisk file that we created 
-        //earlier when we invoked cloneFileAttributes
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      // Note the output of the merge
-      closeOnDiskFile(outputPath);
-    }
-
-  }
-  
-  private class OnDiskMerger extends MergeThread<Path,K,V> {
-    
-    public OnDiskMerger(MergeManager<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<Path> inputs) throws IOException {
-      // sanity check
-      if (inputs == null || inputs.isEmpty()) {
-        LOG.info("No ondisk files to merge...");
-        return;
-      }
-      
-      long approxOutputSize = 0;
-      int bytesPerSum = 
-        jobConf.getInt("io.bytes.per.checksum", 512);
-      
-      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
-               " map outputs on disk. Triggering merge...");
-      
-      // 1. Prepare the list of files to be merged. 
-      for (Path file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file).getLen();
-      }
-
-      // add the checksum length
-      approxOutputSize += 
-        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
-      // 2. Start the on-disk merge process
-      Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
-            approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath, 
-                        (Class<K>) jobConf.getMapOutputKeyClass(), 
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
-      RawKeyValueIterator iter  = null;
-      Path tmpDir = new Path(reduceId.toString());
-      try {
-        iter = Merger.merge(jobConf, rfs,
-                            (Class<K>) jobConf.getMapOutputKeyClass(),
-                            (Class<V>) jobConf.getMapOutputValueClass(),
-                            codec, inputs.toArray(new Path[inputs.size()]), 
-                            true, ioSortFactor, tmpDir, 
-                            (RawComparator<K>) jobConf.getOutputKeyComparator(), 
-                            reporter, spilledRecordsCounter, null, 
-                            mergedMapOutputsCounter, null);
-
-        Merger.writeFile(iter, writer, reporter, jobConf);
-        writer.close();
-      } catch (IOException e) {
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      closeOnDiskFile(outputPath);
-
-      LOG.info(reduceId +
-          " Finished merging " + inputs.size() + 
-          " map output files on disk of total-size " + 
-          approxOutputSize + "." + 
-          " Local output file is " + outputPath + " of size " +
-          localFS.getFileStatus(outputPath).getLen());
-    }
-  }
-  
-  private void combineAndSpill(
-      RawKeyValueIterator kvIter,
-      Counters.Counter inCounter) throws IOException {
-    JobConf job = jobConf;
-    Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
-    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
-    RawComparator<K> comparator = 
-      (RawComparator<K>)job.getOutputKeyComparator();
-    try {
-      CombineValuesIterator values = new CombineValuesIterator(
-          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
-          inCounter);
-      while (values.more()) {
-        combiner.reduce(values.getKey(), values, combineCollector,
-                        Reporter.NULL);
-        values.nextKey();
-      }
-    } finally {
-      combiner.close();
-    }
-  }
-
-  private long createInMemorySegments(List<MapOutput<K,V>> inMemoryMapOutputs,
-                                      List<Segment<K, V>> inMemorySegments, 
-                                      long leaveBytes
-                                      ) throws IOException {
-    long totalSize = 0L;
-    // We could use fullSize could come from the RamManager, but files can be
-    // closed but not yet present in inMemoryMapOutputs
-    long fullSize = 0L;
-    for (MapOutput<K,V> mo : inMemoryMapOutputs) {
-      fullSize += mo.getMemory().length;
-    }
-    while(fullSize > leaveBytes) {
-      MapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
-      byte[] data = mo.getMemory();
-      long size = data.length;
-      totalSize += size;
-      fullSize -= size;
-      Reader<K,V> reader = new InMemoryReader<K,V>(MergeManager.this, 
-                                                   mo.getMapId(),
-                                                   data, 0, (int)size);
-      inMemorySegments.add(new Segment<K,V>(reader, true, 
-                                            (mo.isPrimaryMapOutput() ? 
-                                            mergedMapOutputsCounter : null)));
-    }
-    return totalSize;
-  }
-
-  class RawKVIteratorReader extends IFile.Reader<K,V> {
-
-    private final RawKeyValueIterator kvIter;
-
-    public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
-        throws IOException {
-      super(null, null, size, null, spilledRecordsCounter);
-      this.kvIter = kvIter;
-    }
-    public boolean nextRawKey(DataInputBuffer key) throws IOException {
-      if (kvIter.next()) {
-        final DataInputBuffer kb = kvIter.getKey();
-        final int kp = kb.getPosition();
-        final int klen = kb.getLength() - kp;
-        key.reset(kb.getData(), kp, klen);
-        bytesRead += klen;
-        return true;
-      }
-      return false;
-    }
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      final DataInputBuffer vb = kvIter.getValue();
-      final int vp = vb.getPosition();
-      final int vlen = vb.getLength() - vp;
-      value.reset(vb.getData(), vp, vlen);
-      bytesRead += vlen;
-    }
-    public long getPosition() throws IOException {
-      return bytesRead;
-    }
-
-    public void close() throws IOException {
-      kvIter.close();
-    }
-  }
-
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
-                                       List<MapOutput<K,V>> inMemoryMapOutputs,
-                                       List<Path> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
-    final float maxRedPer =
-      job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
-    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
-    }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    
-
-    // merge config params
-    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
-    boolean keepInputs = job.getKeepFailedTaskFiles();
-    final Path tmpDir = new Path(reduceId.toString());
-    final RawComparator<K> comparator =
-      (RawComparator<K>)job.getOutputKeyComparator();
-
-    // segments required to vacate memory
-    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
-    long inMemToDiskBytes = 0;
-    boolean mergePhaseFinished = false;
-    if (inMemoryMapOutputs.size() > 0) {
-      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
-      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                                memDiskSegments,
-                                                maxInMemReduce);
-      final int numMemDiskSegments = memDiskSegments.size();
-      if (numMemDiskSegments > 0 &&
-            ioSortFactor > onDiskMapOutputs.size()) {
-        
-        // If we reach here, it implies that we have less than io.sort.factor
-        // disk segments and this will be incremented by 1 (result of the 
-        // memory segments merge). Since this total would still be 
-        // <= io.sort.factor, we will not do any more intermediate merges,
-        // the merge of all these disk segments would be directly fed to the
-        // reduce method
-        
-        mergePhaseFinished = true;
-        // must spill to disk, but can't retain in-mem for intermediate merge
-        final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(mapId,
-                                             inMemToDiskBytes).suffix(
-                                                 Task.MERGED_OUTPUT_PREFIX);
-        final RawKeyValueIterator rIter = Merger.merge(job, fs,
-            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
-            mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
-        try {
-          Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
-          onDiskMapOutputs.add(outputPath);
-        } catch (IOException e) {
-          if (null != outputPath) {
-            try {
-              fs.delete(outputPath, true);
-            } catch (IOException ie) {
-              // NOTHING
-            }
-          }
-          throw e;
-        } finally {
-          if (null != writer) {
-            writer.close();
-          }
-        }
-        LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit");
-        inMemToDiskBytes = 0;
-        memDiskSegments.clear();
-      } else if (inMemToDiskBytes != 0) {
-        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes in memory for " +
-                 "intermediate, on-disk merge");
-      }
-    }
-
-    // segments on disk
-    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
-    long onDiskBytes = inMemToDiskBytes;
-    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
-    for (Path file : onDisk) {
-      onDiskBytes += fs.getFileStatus(file).getLen();
-      LOG.debug("Disk file: " + file + " Length is " + 
-          fs.getFileStatus(file).getLen());
-      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
-                                         (file.toString().endsWith(
-                                             Task.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter)
-                                        ));
-    }
-    LOG.info("Merging " + onDisk.length + " files, " +
-             onDiskBytes + " bytes from disk");
-    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
-      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
-        if (o1.getLength() == o2.getLength()) {
-          return 0;
-        }
-        return o1.getLength() < o2.getLength() ? -1 : 1;
-      }
-    });
-
-    // build final list of segments from merged backed by disk + in-mem
-    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
-    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                             finalSegments, 0);
-    LOG.info("Merging " + finalSegments.size() + " segments, " +
-             inMemBytes + " bytes from memory into reduce");
-    if (0 != onDiskBytes) {
-      final int numInMemSegments = memDiskSegments.size();
-      diskSegments.addAll(0, memDiskSegments);
-      memDiskSegments.clear();
-      // Pass mergePhase only if there is a going to be intermediate
-      // merges. See comment where mergePhaseFinished is being set
-      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
-      RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
-          ioSortFactor, numInMemSegments, tmpDir, comparator,
-          reporter, false, spilledRecordsCounter, null, thisPhase);
-      diskSegments.clear();
-      if (0 == finalSegments.size()) {
-        return diskMerge;
-      }
-      finalSegments.add(new Segment<K,V>(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    }
-    return Merger.merge(job, fs, keyClass, valueClass,
-                 finalSegments, finalSegments.size(), tmpDir,
-                 comparator, reporter, spilledRecordsCounter, null,
-                 null);
-  
-  }
+  public RawKeyValueIterator close() throws Throwable;
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Thu Jan 24 02:45:45 2013
@@ -34,12 +34,12 @@ abstract class MergeThread<T,K,V> extend
 
   private AtomicInteger numPending = new AtomicInteger(0);
   private LinkedList<List<T>> pendingToBeMerged;
-  protected final MergeManager<K,V> manager;
+  protected final MergeManagerImpl<K,V> manager;
   private final ExceptionReporter reporter;
   private boolean closed = false;
   private final int mergeFactor;
   
-  public MergeThread(MergeManager<K,V> manager, int mergeFactor,
+  public MergeThread(MergeManagerImpl<K,V> manager, int mergeFactor,
                      ExceptionReporter reporter) {
     this.pendingToBeMerged = new LinkedList<List<T>>();
     this.manager = manager;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Thu Jan 24 02:45:45 2013
@@ -21,17 +21,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
 import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
@@ -77,17 +70,21 @@ public class Shuffle<K, V> implements Sh
     this.taskStatus = context.getStatus();
     this.reduceTask = context.getReduceTask();
     
-    scheduler = 
-      new ShuffleScheduler<K,V>(jobConf, taskStatus, this, copyPhase, 
-                                context.getShuffledMapsCounter(), 
-                                context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
-    merger = new MergeManager<K, V>(reduceId, jobConf, context.getLocalFS(),
-                                    context.getLocalDirAllocator(), reporter, context.getCodec(),
-                                    context.getCombinerClass(), context.getCombineCollector(), 
-                                    context.getSpilledRecordsCounter(), 
-                                    context.getReduceCombineInputCounter(), 
-                                    context.getMergedMapOutputsCounter(), 
-                                    this, context.getMergePhase(), context.getMapOutputFile());
+    scheduler = new ShuffleScheduler<K,V>(jobConf, taskStatus, this,
+        copyPhase, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+    merger = createMergeManager(context);
+  }
+
+  protected MergeManager<K, V> createMergeManager(
+      ShuffleConsumerPlugin.Context context) {
+    return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(),
+        context.getLocalDirAllocator(), reporter, context.getCodec(),
+        context.getCombinerClass(), context.getCombineCollector(), 
+        context.getSpilledRecordsCounter(),
+        context.getReduceCombineInputCounter(),
+        context.getMergedMapOutputsCounter(), this, context.getMergePhase(),
+        context.getMapOutputFile());
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier Thu Jan 24 02:45:45 2013
@@ -1,2 +1,15 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
 org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier
 org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Thu Jan 24 02:45:45 2013
@@ -1 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
 org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier$Renewer

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jan 24 02:45:45 2013
@@ -858,6 +858,17 @@
 </property>
 
 <property>
+  <name>yarn.app.mapreduce.am.admin.user.env</name>
+  <value></value>
+  <description> Environment variables for the MR App Master 
+  processes for admin purposes. These values are set first and can be 
+  overridden by the user env (yarn.app.mapreduce.am.env) Example :
+  1) A=foo  This will set the env variable A to foo
+  2) B=$B:c This is inherit app master's B env variable.  
+  </description>
+</property>
+
+<property>
   <name>yarn.app.mapreduce.am.command-opts</name>
   <value>-Xmx1024m</value>
   <description>Java opts for the MR App Master processes.  

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1432789-1437840

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Thu Jan 24 02:45:45 2013
@@ -53,7 +53,7 @@ public class TestFetcher {
     private HttpURLConnection connection;
 
     public FakeFetcher(JobConf job, TaskAttemptID reduceId,
-        ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter,
+        ShuffleScheduler<K,V> scheduler, MergeManagerImpl<K,V> merger, Reporter reporter,
         ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
         SecretKey jobTokenSecret, HttpURLConnection connection) {
       super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
@@ -77,7 +77,7 @@ public class TestFetcher {
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
     ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
-    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     ExceptionReporter except = mock(ExceptionReporter.class);
@@ -132,7 +132,7 @@ public class TestFetcher {
     JobConf job = new JobConf();
     TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
     ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
-    MergeManager<Text, Text> mm = mock(MergeManager.class);
+    MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
     Reporter r = mock(Reporter.class);
     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
     ExceptionReporter except = mock(ExceptionReporter.class);
@@ -167,10 +167,9 @@ public class TestFetcher {
     header.write(new DataOutputStream(bout));
     ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
     when(connection.getInputStream()).thenReturn(in);
-    //Defaults to WAIT, which is what we want to test
-    MapOutput<Text,Text> mapOut = new MapOutput<Text, Text>(map1ID);
+    //Defaults to null, which is what we want to test
     when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
-      .thenReturn(mapOut);
+      .thenReturn(null);
     
     underTest.copyFromHost(host);
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Thu Jan 24 02:45:45 2013
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestMergeManager {
 
   @Test(timeout=10000)
+  @SuppressWarnings("unchecked")
   public void testMemoryMerge() throws Exception {
     final int TOTAL_MEM_BYTES = 10000;
     final int OUTPUT_SIZE = 7950;
@@ -55,45 +55,47 @@ public class TestMergeManager {
 
     // reserve enough map output to cause a merge when it is committed
     MapOutput<Text, Text> out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out1.getType());
-    fillOutput(out1);
+    Assert.assertTrue("Should be a memory merge",
+                      (out1 instanceof InMemoryMapOutput));
+    InMemoryMapOutput<Text, Text> mout1 = (InMemoryMapOutput<Text, Text>)out1;
+    fillOutput(mout1);
     MapOutput<Text, Text> out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out2.getType());
-    fillOutput(out2);
+    Assert.assertTrue("Should be a memory merge",
+                      (out2 instanceof InMemoryMapOutput));
+    InMemoryMapOutput<Text, Text> mout2 = (InMemoryMapOutput<Text, Text>)out2;
+    fillOutput(mout2);
 
     // next reservation should be a WAIT
     MapOutput<Text, Text> out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be told to wait",
-        Type.WAIT, out3.getType());
+    Assert.assertEquals("Should be told to wait", null, out3);
 
     // trigger the first merge and wait for merge thread to start merging
     // and free enough output to reserve more
-    out1.commit();
-    out2.commit();
+    mout1.commit();
+    mout2.commit();
     mergeStart.await();
 
     Assert.assertEquals(1, mgr.getNumMerges());
 
     // reserve enough map output to cause another merge when committed
     out1 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out1.getType());
-    fillOutput(out1);
+    Assert.assertTrue("Should be a memory merge",
+                       (out1 instanceof InMemoryMapOutput));
+    mout1 = (InMemoryMapOutput<Text, Text>)out1;
+    fillOutput(mout1);
     out2 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be a memory merge",
-        Type.MEMORY, out2.getType());
-    fillOutput(out2);
+    Assert.assertTrue("Should be a memory merge",
+                       (out2 instanceof InMemoryMapOutput));
+    mout2 = (InMemoryMapOutput<Text, Text>)out2;
+    fillOutput(mout2);
 
-    // next reservation should be a WAIT
+    // next reservation should be null
     out3 = mgr.reserve(null, OUTPUT_SIZE, 0);
-    Assert.assertEquals("Should be told to wait",
-        Type.WAIT, out3.getType());
+    Assert.assertEquals("Should be told to wait", null, out3);
 
     // commit output *before* merge thread completes
-    out1.commit();
-    out2.commit();
+    mout1.commit();
+    mout2.commit();
 
     // allow the first merge to complete
     mergeComplete.await();
@@ -110,7 +112,7 @@ public class TestMergeManager {
         0, reporter.getNumExceptions());
   }
 
-  private void fillOutput(MapOutput<Text, Text> output) throws IOException {
+  private void fillOutput(InMemoryMapOutput<Text, Text> output) throws IOException {
     BoundedByteArrayOutputStream stream = output.getArrayStream();
     int count = stream.getLimit();
     for (int i=0; i < count; ++i) {
@@ -118,7 +120,7 @@ public class TestMergeManager {
     }
   }
 
-  private static class StubbedMergeManager extends MergeManager<Text, Text> {
+  private static class StubbedMergeManager extends MergeManagerImpl<Text, Text> {
     private TestMergeThread mergeThread;
 
     public StubbedMergeManager(JobConf conf, ExceptionReporter reporter,
@@ -129,7 +131,7 @@ public class TestMergeManager {
     }
 
     @Override
-    protected MergeThread<MapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
+    protected MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> createInMemoryMerger() {
       mergeThread = new TestMergeThread(this, getExceptionReporter());
       return mergeThread;
     }
@@ -140,12 +142,12 @@ public class TestMergeManager {
   }
 
   private static class TestMergeThread
-  extends MergeThread<MapOutput<Text,Text>, Text, Text> {
+  extends MergeThread<InMemoryMapOutput<Text,Text>, Text, Text> {
     private AtomicInteger numMerges;
     private CyclicBarrier mergeStart;
     private CyclicBarrier mergeComplete;
 
-    public TestMergeThread(MergeManager<Text, Text> mergeManager,
+    public TestMergeThread(MergeManagerImpl<Text, Text> mergeManager,
         ExceptionReporter reporter) {
       super(mergeManager, Integer.MAX_VALUE, reporter);
       numMerges = new AtomicInteger(0);
@@ -162,11 +164,11 @@ public class TestMergeManager {
     }
 
     @Override
-    public void merge(List<MapOutput<Text, Text>> inputs)
+    public void merge(List<InMemoryMapOutput<Text, Text>> inputs)
         throws IOException {
       synchronized (this) {
         numMerges.incrementAndGet();
-        for (MapOutput<Text, Text> input : inputs) {
+        for (InMemoryMapOutput<Text, Text> input : inputs) {
           manager.unreserve(input.getSize());
         }
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml Thu Jan 24 02:45:45 2013
@@ -54,4 +54,20 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+
+  <build>
+    <plugins>
+     <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/job_1329348432655_0001_conf.xml</exclude>
+            <exclude>src/test/resources/job_1329348432655_0001-1329348443227-user-Sleep+job-1329348468601-10-1-SUCCEEDED-default.jhist</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
 </project>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Thu Jan 24 02:45:45 2013
@@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.TaskID;
@@ -183,13 +184,13 @@ public class CompletedJob implements org
   }
 
   @Override
-  public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+  public synchronized TaskCompletionEvent[] getMapAttemptCompletionEvents(
       int startIndex, int maxEvents) {
     if (mapCompletionEvents == null) {
       constructTaskAttemptCompletionEvents();
     }
-    return getAttemptCompletionEvents(mapCompletionEvents,
-        startIndex, maxEvents);
+    return TypeConverter.fromYarn(getAttemptCompletionEvents(
+        mapCompletionEvents, startIndex, maxEvents));
   }
 
   private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Thu Jan 24 02:45:45 2013
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@@ -154,7 +155,7 @@ public class PartialJob implements org.a
   }
 
   @Override
-  public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+  public TaskCompletionEvent[] getMapAttemptCompletionEvents(
       int startIndex, int maxEvents) {
     return null;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Thu Jan 24 02:45:45 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
@@ -6,6 +23,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -126,7 +144,7 @@ public class MockHistoryJobs extends Moc
     }
 
     @Override
-    public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+    public TaskCompletionEvent[] getMapAttemptCompletionEvents(
         int startIndex, int maxEvents) {
       return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Thu Jan 24 02:45:45 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import static junit.framework.Assert.assertEquals;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml Thu Jan 24 02:45:45 2013
@@ -148,6 +148,15 @@
           </additionalClasspathElements>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/java/org/apache/hadoop/cli/data60bytes</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Jan 24 02:45:45 2013
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -69,14 +70,16 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
 
 public class ClientServiceDelegate {
   private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -176,13 +179,10 @@ public class ClientServiceDelegate {
           serviceAddr = NetUtils.createSocketAddrForHost(
               application.getHost(), application.getRpcPort());
           if (UserGroupInformation.isSecurityEnabled()) {
-            String clientTokenEncoded = application.getClientToken();
-            Token<ApplicationTokenIdentifier> clientToken =
-              new Token<ApplicationTokenIdentifier>();
-            clientToken.decodeFromUrlString(clientTokenEncoded);
-            // RPC layer client expects ip:port as service for tokens
-            SecurityUtil.setTokenService(clientToken, serviceAddr);
-            newUgi.addToken(clientToken);
+            ClientToken clientToken = application.getClientToken();
+            Token<ClientTokenIdentifier> token =
+                ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
+            newUgi.addToken(token);
           }
           LOG.debug("Connecting to " + serviceAddr);
           final InetSocketAddress finalServiceAddr = serviceAddr;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/NotRunningJob.java Thu Jan 24 02:45:45 2013
@@ -62,8 +62,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -86,10 +86,9 @@ public class NotRunningJob implements MR
         .newRecordInstance(ApplicationAttemptId.class);
 
     // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
-    // used
-    // for a non running job
+    // used for a non running job
     return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
-        "N/A", "N/A", "N/A", "N/A", 0, "", YarnApplicationState.NEW, "N/A",
+        "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
         "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Jan 24 02:45:45 2013
@@ -101,7 +101,7 @@ public class YARNRunner implements Clien
   private Configuration conf;
   private final FileContext defaultFileContext;
   
-  /* usually is false unless the jobclient getdelegation token is 
+  /* usually is false unless the jobclient get delegation token is 
    *  called. This is a hack wherein we do return a token from RM 
    *  on getDelegationtoken but due to the restricted api on jobclient
    *  we just add a job history DT token when submitting a job.
@@ -165,12 +165,12 @@ public class YARNRunner implements Clien
   @Override
   public TaskTrackerInfo[] getActiveTrackers() throws IOException,
       InterruptedException {
-	  return resMgrDelegate.getActiveTrackers();
+    return resMgrDelegate.getActiveTrackers();
   }
 
   @Override
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-   return resMgrDelegate.getAllJobs();
+    return resMgrDelegate.getAllJobs();
   }
 
   @Override
@@ -394,14 +394,31 @@ public class YARNRunner implements Clien
         MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
     MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
 
+    // Check for Java Lib Path usage in MAP and REDUCE configs
+    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
+        MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
+        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
+        MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
+        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);   
+
     // Add AM admin command opts before user command opts
     // so that it can be overridden by user
-    vargs.add(conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS));
+    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
+        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+    vargs.add(mrAppMasterAdminOptions);
+    
+    // Add AM user command opts
+    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
+        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+    vargs.add(mrAppMasterUserOptions);
     
-    vargs.add(conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS));
-
     vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
         Path.SEPARATOR + ApplicationConstants.STDOUT);
@@ -425,6 +442,9 @@ public class YARNRunner implements Clien
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, conf);
     
+    // Setup the environment variables for Admin first
+    MRApps.setEnvFromInputString(environment, 
+        conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
     // Setup the environment variables (LD_LIBRARY_PATH, etc)
     MRApps.setEnvFromInputString(environment, 
         conf.get(MRJobConfig.MR_AM_ENV));
@@ -582,4 +602,15 @@ public class YARNRunner implements Clien
       throws IOException {
     return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
   }
+
+  private static void warnForJavaLibPath(String opts, String component, 
+      String javaConf, String envConf) {
+    if (opts != null && opts.contains("-Djava.library.path")) {
+      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+               "programs to no longer function if hadoop native libraries " +
+               "are used. These values should be set as part of the " +
+               "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+               envConf + " config settings.");
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Thu Jan 24 02:45:45 2013
@@ -1 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
 org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java?rev=1437843&r1=1437842&r2=1437843&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/NotificationTestCase.java Thu Jan 24 02:45:45 2013
@@ -90,8 +90,8 @@ public abstract class NotificationTestCa
   }
 
   public static class NotificationServlet extends HttpServlet {
-    public static int counter = 0;
-    public static int failureCounter = 0;
+    public static volatile int counter = 0;
+    public static volatile int failureCounter = 0;
     private static final long serialVersionUID = 1L;
     
     protected void doGet(HttpServletRequest req, HttpServletResponse res)
@@ -155,7 +155,11 @@ public abstract class NotificationTestCa
 
     System.out.println(launchWordCount(this.createJobConf(),
                                        "a b c d e f g h", 1, 1));
-    Thread.sleep(2000);
+    boolean keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 2);
+    }
     assertEquals(2, NotificationServlet.counter);
     assertEquals(0, NotificationServlet.failureCounter);
     
@@ -173,14 +177,22 @@ public abstract class NotificationTestCa
     // run a job with KILLED status
     System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
                                                 outDir).getID());
-    Thread.sleep(2000);
+    keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 4);
+    }
     assertEquals(4, NotificationServlet.counter);
     assertEquals(0, NotificationServlet.failureCounter);
     
     // run a job with FAILED status
     System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
                                                 outDir).getID());
-    Thread.sleep(2000);
+    keepTrying = true;
+    for (int tries = 0; tries < 30 && keepTrying; tries++) {
+      Thread.sleep(50);
+      keepTrying = !(NotificationServlet.counter == 6);
+    }
     assertEquals(6, NotificationServlet.counter);
     assertEquals(0, NotificationServlet.failureCounter);
   }