You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cu...@apache.org on 2014/08/21 23:56:07 UTC

svn commit: r1619608 - in /hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/ hadoop-mapreduce-client/hadoop-mapreduce-clien...

Author: curino
Date: Thu Aug 21 21:55:57 2014
New Revision: 1619608

URL: http://svn.apache.org/r1619608
Log:
Merge with trunk to pick up YARN-2436

Added:
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
      - copied unchanged from r1619607, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
      - copied unchanged from r1619607, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
Modified:
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mapred
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project:r1594376-1619194
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1619018-1619607

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt Thu Aug 21 21:55:57 2014
@@ -86,6 +86,9 @@ Trunk (Unreleased)
     MAPREDUCE-6019. MapReduce changes for exposing YARN/MR endpoints on multiple
     interfaces. (Craig Welch, Milan Potocnik, Arpit Agarwal via xgong)
 
+    MAPREDUCE-6013. [post-HADOOP-9902] mapred version is missing (Akira AJISAKA
+    via aw)
+
   BUG FIXES
 
     MAPREDUCE-5714. Removed forceful JVM exit in shutDownJob.  
@@ -151,6 +154,16 @@ Trunk (Unreleased)
     MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to 
     ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
 
+  BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
+
+    MAPREDUCE-5890. Support for encrypting Intermediate 
+    data and spills in local filesystem. (asuresh via tucu)
+
+    MAPREDUCE-6007. Add support to distcp to preserve raw.* namespace
+    extended attributes. (clamb)
+
+    MAPREDUCE-6041. Fix TestOptionsParser. (clamb)
+
 Release 2.6.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -174,6 +187,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5906. Inconsistent configuration in property
       "mapreduce.reduce.shuffle.input.buffer.percent" (Akira AJISAKA via aw)
 
+    MAPREDUCE-5974. Allow specifying multiple MapOutputCollectors with 
+    fallback. (Todd Lipcon via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -236,7 +252,7 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6012. DBInputSplit creates invalid ranges on Oracle. 
     (Wei Yan via kasha)
 
-Release 2.5.0 - UNRELEASED
+Release 2.5.0 - 2014-08-11
 
   INCOMPATIBLE CHANGES
 

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt:r1594376-1619194
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1619018-1619607

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mapred
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mapred?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mapred (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/bin/mapred Thu Aug 21 21:55:57 2014
@@ -29,6 +29,7 @@ function hadoop_usage
   echo "  pipes                run a Pipes job"
   echo "  queue                get information regarding JobQueues"
   echo "  sampler              sampler"
+  echo "  version              print the version"
   echo ""
   echo "Most commands print help when invoked w/o parameters."
 }
@@ -105,6 +106,10 @@ case ${COMMAND} in
     CLASS=org.apache.hadoop.mapred.lib.InputSampler
     HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
   ;;
+  version)
+    CLASS=org.apache.hadoop.util.VersionInfo
+    HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+  ;;
   -*|*)
     hadoop_exit_with_usage 1
   ;;

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf:r1594376-1619194
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1619018-1619607

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Aug 21 21:55:57 2014
@@ -34,6 +34,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -56,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.util.ApplicationClassLoader;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
@@ -67,7 +69,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.RollingFileAppender;

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Aug 21 21:55:57 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.util.ApplicationClassLoader;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -58,7 +59,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -514,7 +514,8 @@ public class TestMRApps {
   @Test
   public void testSystemClasses() {
     final List<String> systemClasses =
-        Arrays.asList(MRApps.getSystemClasses(new Configuration()));
+        Arrays.asList(StringUtils.getTrimmedStrings(
+        ApplicationClassLoader.DEFAULT_SYSTEM_CLASSES));
     for (String defaultXml : DEFAULT_XMLS) {
       assertTrue(defaultXml + " must be system resource",
           ApplicationClassLoader.isSystemClass(defaultXml, systemClasses));

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Aug 21 21:55:57 2014
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.Merger.S
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 
 /**
  * <code>BackupStore</code> is an utility class that is used to support
@@ -572,7 +574,9 @@ public class BackupStore<K,V> {
 
       file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), 
           -1, conf);
-      return new Writer<K, V>(conf, fs, file);
+      FSDataOutputStream out = fs.create(file);
+      out = CryptoUtils.wrapIfNecessary(conf, out);
+      return new Writer<K, V>(conf, out, null, null, null, null, true);
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Thu Aug 21 21:55:57 2014
@@ -90,13 +90,11 @@ public class IFile {
     
     DataOutputBuffer buffer = new DataOutputBuffer();
 
-    public Writer(Configuration conf, FileSystem fs, Path file, 
-                  Class<K> keyClass, Class<V> valueClass,
-                  CompressionCodec codec,
-                  Counters.Counter writesCounter) throws IOException {
-      this(conf, fs.create(file), keyClass, valueClass, codec,
-           writesCounter);
-      ownOutputStream = true;
+    public Writer(Configuration conf, FSDataOutputStream out,
+        Class<K> keyClass, Class<V> valueClass,
+        CompressionCodec codec, Counters.Counter writesCounter)
+        throws IOException {
+      this(conf, out, keyClass, valueClass, codec, writesCounter, false);
     }
     
     protected Writer(Counters.Counter writesCounter) {
@@ -105,7 +103,8 @@ public class IFile {
 
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
-        CompressionCodec codec, Counters.Counter writesCounter)
+        CompressionCodec codec, Counters.Counter writesCounter,
+        boolean ownOutputStream)
         throws IOException {
       this.writtenRecordsCounter = writesCounter;
       this.checksumOut = new IFileOutputStream(out);
@@ -137,11 +136,7 @@ public class IFile {
         this.valueSerializer = serializationFactory.getSerializer(valueClass);
         this.valueSerializer.open(buffer);
       }
-    }
-
-    public Writer(Configuration conf, FileSystem fs, Path file) 
-    throws IOException {
-      this(conf, fs, file, null, null, null, null);
+      this.ownOutputStream = ownOutputStream;
     }
 
     public void close() throws IOException {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Aug 21 21:55:57 2014
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.lib.m
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -380,16 +381,35 @@ public class MapTask extends Task {
   private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
           createSortingCollector(JobConf job, TaskReporter reporter)
     throws IOException, ClassNotFoundException {
-    MapOutputCollector<KEY, VALUE> collector
-      = (MapOutputCollector<KEY, VALUE>)
-       ReflectionUtils.newInstance(
-                        job.getClass(JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR,
-                        MapOutputBuffer.class, MapOutputCollector.class), job);
-    LOG.info("Map output collector class = " + collector.getClass().getName());
     MapOutputCollector.Context context =
-                           new MapOutputCollector.Context(this, job, reporter);
-    collector.init(context);
-    return collector;
+      new MapOutputCollector.Context(this, job, reporter);
+
+    Class<?>[] collectorClasses = job.getClasses(
+      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
+    int remainingCollectors = collectorClasses.length;
+    for (Class clazz : collectorClasses) {
+      try {
+        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
+          throw new IOException("Invalid output collector class: " + clazz.getName() +
+            " (does not implement MapOutputCollector)");
+        }
+        Class<? extends MapOutputCollector> subclazz =
+          clazz.asSubclass(MapOutputCollector.class);
+        LOG.debug("Trying map output collector class: " + subclazz.getName());
+        MapOutputCollector<KEY, VALUE> collector =
+          ReflectionUtils.newInstance(subclazz, job);
+        collector.init(context);
+        LOG.info("Map output collector class = " + collector.getClass().getName());
+        return collector;
+      } catch (Exception e) {
+        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
+        if (--remainingCollectors > 0) {
+          msg += " (" + remainingCollectors + " more collector(s) to try)";
+        }
+        LOG.warn(msg, e);
+      }
+    }
+    throw new IOException("Unable to initialize any output collector");
   }
 
   @SuppressWarnings("unchecked")
@@ -1580,7 +1600,8 @@ public class MapTask extends Task {
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
+            FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
+            writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                       spilledRecordsCounter);
             if (combinerRunner == null) {
               // spill directly
@@ -1617,8 +1638,8 @@ public class MapTask extends Task {
 
             // record offsets
             rec.startOffset = segmentStart;
-            rec.rawLength = writer.getRawLength();
-            rec.partLength = writer.getCompressedLength();
+            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
             spillRec.putIndex(rec, i);
 
             writer = null;
@@ -1668,7 +1689,8 @@ public class MapTask extends Task {
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
+            FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
+            writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
                                             spilledRecordsCounter);
 
             if (i == partition) {
@@ -1682,8 +1704,8 @@ public class MapTask extends Task {
 
             // record offsets
             rec.startOffset = segmentStart;
-            rec.rawLength = writer.getRawLength();
-            rec.partLength = writer.getCompressedLength();
+            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
             spillRec.putIndex(rec, i);
 
             writer = null;
@@ -1825,12 +1847,13 @@ public class MapTask extends Task {
         try {
           for (int i = 0; i < partitions; i++) {
             long segmentStart = finalOut.getPos();
+            FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
             Writer<K, V> writer =
-              new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
+              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
             writer.close();
             rec.startOffset = segmentStart;
-            rec.rawLength = writer.getRawLength();
-            rec.partLength = writer.getCompressedLength();
+            rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+            rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
             sr.putIndex(rec, i);
           }
           sr.writeToFile(finalIndexFile, job);
@@ -1879,8 +1902,9 @@ public class MapTask extends Task {
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
+          FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
           Writer<K, V> writer =
-              new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
+              new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
                                spilledRecordsCounter);
           if (combinerRunner == null || numSpills < minSpillsForCombine) {
             Merger.writeFile(kvIter, writer, reporter, job);
@@ -1896,8 +1920,8 @@ public class MapTask extends Task {
           
           // record offsets
           rec.startOffset = segmentStart;
-          rec.rawLength = writer.getRawLength();
-          rec.partLength = writer.getCompressedLength();
+          rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
+          rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
           spillRec.putIndex(rec, parts);
         }
         spillRec.writeToFile(finalIndexFile, job);

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Thu Aug 21 21:55:57 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Re
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -298,8 +300,12 @@ public class Merger {  
     void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
         FSDataInputStream in = fs.open(file);
+
         in.seek(segmentOffset);
-        reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
+        in = CryptoUtils.wrapIfNecessary(conf, in);
+        reader = new Reader<K, V>(conf, in,
+            segmentLength - CryptoUtils.cryptoPadding(conf),
+            codec, readsCounter);
       }
       
       if (mapOutputsCounter != null) {
@@ -714,9 +720,10 @@ public class Merger {  
                                               tmpFilename.toString(),
                                               approxOutputSize, conf);
 
-          Writer<K, V> writer = 
-            new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
-                             writesCounter);
+          FSDataOutputStream out = fs.create(outputFile);
+          out = CryptoUtils.wrapIfNecessary(conf, out);
+          Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
+              codec, writesCounter, true);
           writeFile(this, writer, reporter, conf);
           writer.close();
           

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Thu Aug 21 21:55:57 2014
@@ -291,7 +291,7 @@ class JobSubmitter {
   /**
    * configure the jobconf of the user with the command line options of 
    * -libjars, -files, -archives.
-   * @param conf
+   * @param job
    * @throws IOException
    */
   private void copyAndConfigureFiles(Job job, Path jobSubmitDir) 
@@ -376,8 +376,13 @@ class JobSubmitter {
       if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
         KeyGenerator keyGen;
         try {
+         
+          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
+              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
+                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
+              : SHUFFLE_KEY_LENGTH;
           keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
-          keyGen.init(SHUFFLE_KEY_LENGTH);
+          keyGen.init(keyLen);
         } catch (NoSuchAlgorithmException e) {
           throw new IOException("Error generating shuffle secret key", e);
         }

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Aug 21 21:55:57 2014
@@ -771,4 +771,18 @@ public interface MRJobConfig {
   
   public static final String TASK_PREEMPTION =
       "mapreduce.job.preemption";
+
+  public static final String MR_ENCRYPTED_INTERMEDIATE_DATA =
+      "mapreduce.job.encrypted-intermediate-data";
+  public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false;
+
+  public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
+      "mapreduce.job.encrypted-intermediate-data-key-size-bits";
+  public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS =
+      128;
+
+  public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+      "mapreduce.job.encrypted-intermediate-data.buffer.kb";
+  public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
+          128;
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Aug 21 21:55:57 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRCon
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -65,6 +67,7 @@ class Fetcher<K,V> extends Thread {
                                     CONNECTION, WRONG_REDUCE}
   
   private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+  private final JobConf jobConf;
   private final Counters.Counter connectionErrs;
   private final Counters.Counter ioErrs;
   private final Counters.Counter wrongLengthErrs;
@@ -104,6 +107,7 @@ class Fetcher<K,V> extends Thread {
                  Reporter reporter, ShuffleClientMetrics metrics,
                  ExceptionReporter exceptionReporter, SecretKey shuffleKey,
                  int id) {
+    this.jobConf = job;
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -396,7 +400,11 @@ class Fetcher<K,V> extends Thread {
         return remaining.toArray(new TaskAttemptID[remaining.size()]);
       }
 
- 
+      InputStream is = input;
+      is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
+      compressedLength -= CryptoUtils.cryptoPadding(jobConf);
+      decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
+      
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
           remaining, mapId)) {
@@ -433,7 +441,7 @@ class Fetcher<K,V> extends Thread {
         LOG.info("fetcher#" + id + " about to shuffle output of map "
             + mapOutput.getMapId() + " decomp: " + decompressedLength
             + " len: " + compressedLength + " to " + mapOutput.getDescription());
-        mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+        mapOutput.shuffle(host, is, compressedLength, decompressedLength,
             metrics, reporter);
       } catch (java.lang.InternalError e) {
         LOG.warn("Failed to shuffle for fetcher#"+id, e);

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java Thu Aug 21 21:55:57 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.MapOutpu
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SpillRecord;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 
 /**
  * LocalFetcher is used by LocalJobRunner to perform a local filesystem
@@ -145,6 +146,9 @@ class LocalFetcher<K,V> extends Fetcher<
     // now read the file, seek to the appropriate section, and send it.
     FileSystem localFs = FileSystem.getLocal(job).getRaw();
     FSDataInputStream inStream = localFs.open(mapOutputFileName);
+
+    inStream = CryptoUtils.wrapIfNecessary(job, inStream);
+
     try {
       inStream.seek(ir.startOffset);
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Thu Aug 21 21:55:57 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.Task.Com
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -228,6 +230,10 @@ public class MergeManagerImpl<K, V> impl
     return new InMemoryMerger(this);
   }
 
+  protected MergeThread<CompressAwarePath,K,V> createOnDiskMerger() {
+    return new OnDiskMerger(this);
+  }
+
   TaskAttemptID getReduceId() {
     return reduceId;
   }
@@ -453,11 +459,10 @@ public class MergeManagerImpl<K, V> impl
                                            mergeOutputSize).suffix(
                                                Task.MERGED_OUTPUT_PREFIX);
 
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath,
-                        (Class<K>) jobConf.getMapOutputKeyClass(),
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
+      FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+      Writer<K, V> writer = new Writer<K, V>(jobConf, out,
+          (Class<K>) jobConf.getMapOutputKeyClass(),
+          (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
 
       RawKeyValueIterator rIter = null;
       CompressAwarePath compressAwarePath;
@@ -537,11 +542,12 @@ public class MergeManagerImpl<K, V> impl
       Path outputPath = 
         localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
             approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath, 
-                        (Class<K>) jobConf.getMapOutputKeyClass(), 
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
+
+      FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+      Writer<K, V> writer = new Writer<K, V>(jobConf, out,
+          (Class<K>) jobConf.getMapOutputKeyClass(),
+          (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
+
       RawKeyValueIterator iter  = null;
       CompressAwarePath compressAwarePath;
       Path tmpDir = new Path(reduceId.toString());
@@ -717,8 +723,10 @@ public class MergeManagerImpl<K, V> impl
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
-        Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
+
+        FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
+        Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
+            codec, null, true);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
           writer.close();

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Thu Aug 21 21:55:57 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.mapred.MapOutputFile;
 
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -75,7 +76,7 @@ class OnDiskMapOutput<K, V> extends MapO
     this.merger = merger;
     this.outputPath = outputPath;
     tmpOutputPath = getTempPath(outputPath, fetcher);
-    disk = fs.create(tmpOutputPath);
+    disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
   }
 
   @VisibleForTesting

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Aug 21 21:55:57 2014
@@ -408,7 +408,9 @@
   <name>mapreduce.job.map.output.collector.class</name>
   <value>org.apache.hadoop.mapred.MapTask$MapOutputBuffer</value>
   <description>
-    It defines the MapOutputCollector implementation to use.
+    The MapOutputCollector implementation(s) to use. This may be a comma-separated
+    list of class names, in which case the map task will try to initialize each
+    of the collectors in turn. The first to successfully initialize will be used.
   </description>
 </property>
  
@@ -1225,13 +1227,13 @@
 
 <property>
    <name>mapreduce.job.classloader.system.classes</name>
-   <value>java.,javax.,org.w3c.dom.,org.xml.sax.,org.apache.commons.logging.,
-          org.apache.log4j.,org.apache.hadoop.,core-default.xml,
-          hdfs-default.xml,mapred-default.xml,yarn-default.xml</value>
-  <description>A comma-separated list of classes that should be loaded from the
-    system classpath, not the user-supplied JARs, when mapreduce.job.classloader
-    is enabled. Names ending in '.' (period) are treated as package names,
-    and names starting with a '-' are treated as negative matches.
+   <value></value>
+  <description>Used to override the default definition of the system classes for
+    the job classloader. The system classes are a comma-separated list of
+    classes that should be loaded from the system classpath, not the
+    user-supplied JARs, when mapreduce.job.classloader is enabled. Names ending
+    in '.' (period) are treated as package names, and names starting with a '-'
+    are treated as negative matches.
   </description>
 </property>
 

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1594376-1619194
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1619018-1619607

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapredCommands.apt.vm Thu Aug 21 21:55:57 2014
@@ -177,6 +177,12 @@ MapReduce Commands Guide
    Creates a hadoop archive. More information can be found at
    {{{./HadoopArchives.html}Hadoop Archives Guide}}.
 
+** <<<version>>>
+
+   Prints the version.
+
+   Usage: <<<mapred version>>>
+
 * Administration Commands
 
    Commands useful for administrators of a hadoop cluster.

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Thu Aug 21 21:55:57 2014
@@ -71,11 +71,16 @@ Hadoop MapReduce Next Generation - Plugg
 *--------------------------------------+---------------------+-----------------+
 | <<<mapreduce.job.reduce.shuffle.consumer.plugin.class>>> | <<<org.apache.hadoop.mapreduce.task.reduce.Shuffle>>>         | The <<<ShuffleConsumerPlugin>>> implementation to use |
 *--------------------------------------+---------------------+-----------------+
-| <<<mapreduce.job.map.output.collector.class>>>   | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation to use |
+| <<<mapreduce.job.map.output.collector.class>>>   | <<<org.apache.hadoop.mapred.MapTask$MapOutputBuffer>>> | The <<<MapOutputCollector>>> implementation(s) to use |
 *--------------------------------------+---------------------+-----------------+
 
   These properties can also be set in the <<<mapred-site.xml>>> to change the default values for all jobs.
 
+  The collector class configuration may specify a comma-separated list of collector implementations.
+  In this case, the map task will attempt to instantiate each in turn until one of the
+  implementations successfully initializes. This can be useful if a given collector
+  implementation is only compatible with certain types of keys or values, for example.
+
 ** NodeManager Configuration properties, <<<yarn-site.xml>>> in all nodes:
 
 *--------------------------------------+---------------------+-----------------+
@@ -91,4 +96,3 @@ Hadoop MapReduce Next Generation - Plugg
   <<<yarn.nodemanager.aux-services>>> property, for example <<<mapred.shufflex>>>.
   Then the property defining the corresponding class must be
   <<<yarn.nodemanager.aux-services.mapreduce_shufflex.class>>>.
-  
\ No newline at end of file

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm Thu Aug 21 21:55:57 2014
@@ -191,6 +191,26 @@ $H3 Update and Overwrite
 
   If `-update` is used, `1` is overwritten as well.
 
+$H3 raw Namespace Extended Attribute Preservation
+
+  This section only applies to HDFS.
+
+  If the target and all of the source pathnames are in the /.reserved/raw
+  hierarchy, then 'raw' namespace extended attributes will be preserved.
+  'raw' xattrs are used by the system for internal functions such as encryption
+  meta data. They are only visible to users when accessed through the
+  /.reserved/raw hierarchy.
+
+  raw xattrs are preserved based solely on whether /.reserved/raw prefixes are
+  supplied. The -p (preserve, see below) flag does not impact preservation of
+  raw xattrs.
+
+  To prevent raw xattrs from being preserved, simply do not use the
+  /.reserved/raw prefix on any of the source and target paths.
+
+  If the /.reserved/raw prefix is specified on only a subset of the source and
+  target paths, an error will be displayed and a non-0 exit code returned.
+
 Command Line Options
 --------------------
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java Thu Aug 21 21:55:57 2014
@@ -24,14 +24,16 @@ import static org.mockito.Mockito.doAnsw
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.junit.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,10 +53,16 @@ import org.apache.hadoop.mapred.RawKeyVa
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.junit.After;
@@ -63,40 +71,48 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Lists;
+
 public class TestMerger {
 
   private Configuration conf;
   private JobConf jobConf;
   private FileSystem fs;
-  
+
   @Before
   public void setup() throws IOException {
     conf = new Configuration();
     jobConf = new JobConf();
     fs = FileSystem.getLocal(conf);
   }
-  
-  @After
-  public void cleanup() throws IOException {    
-    fs.delete(new Path(jobConf.getLocalDirs()[0]), true);
+
+
+  @Test
+  public void testEncryptedMerger() throws Throwable {
+    jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
+    TokenCache.setShuffleSecretKey(new byte[16], credentials);
+    UserGroupInformation.getCurrentUser().addCredentials(credentials);
+    testInMemoryAndOnDiskMerger();
   }
-  
+
   @Test
-  public void testInMemoryMerger() throws Throwable {
+  public void testInMemoryAndOnDiskMerger() throws Throwable {
     JobID jobId = new JobID("a", 0);
-    TaskAttemptID reduceId = new TaskAttemptID(
+    TaskAttemptID reduceId1 = new TaskAttemptID(
         new TaskID(jobId, TaskType.REDUCE, 0), 0);
     TaskAttemptID mapId1 = new TaskAttemptID(
         new TaskID(jobId, TaskType.MAP, 1), 0);
     TaskAttemptID mapId2 = new TaskAttemptID(
         new TaskID(jobId, TaskType.MAP, 2), 0);
-    
+
     LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
-    
+
     MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
-        reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
+        reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
         null, null, new Progress(), new MROutputFiles());
-    
+
     // write map outputs
     Map<String, String> map1 = new TreeMap<String, String>();
     map1.put("apple", "disgusting");
@@ -113,32 +129,88 @@ public class TestMerger {
         mapOutputBytes1.length);
     System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
         mapOutputBytes2.length);
-    
+
     // create merger and run merge
     MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger =
         mergeManager.createInMemoryMerger();
-    List<InMemoryMapOutput<Text, Text>> mapOutputs =
+    List<InMemoryMapOutput<Text, Text>> mapOutputs1 =
         new ArrayList<InMemoryMapOutput<Text, Text>>();
-    mapOutputs.add(mapOutput1);
-    mapOutputs.add(mapOutput2);
-    
-    inMemoryMerger.merge(mapOutputs);
-    
+    mapOutputs1.add(mapOutput1);
+    mapOutputs1.add(mapOutput2);
+
+    inMemoryMerger.merge(mapOutputs1);
+
     Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
-    Path outPath = mergeManager.onDiskMapOutputs.iterator().next();
-    
+
+    TaskAttemptID reduceId2 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.REDUCE, 3), 0);
+    TaskAttemptID mapId3 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 4), 0);
+    TaskAttemptID mapId4 = new TaskAttemptID(
+        new TaskID(jobId, TaskType.MAP, 5), 0);
+    // write map outputs
+    Map<String, String> map3 = new TreeMap<String, String>();
+    map3.put("apple", "awesome");
+    map3.put("carrot", "amazing");
+    Map<String, String> map4 = new TreeMap<String, String>();
+    map4.put("banana", "bla");
+    byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
+    byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
+    InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>(
+        conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
+    InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>(
+        conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
+    System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
+        mapOutputBytes3.length);
+    System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
+        mapOutputBytes4.length);
+
+//    // create merger and run merge
+    MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger2 =
+        mergeManager.createInMemoryMerger();
+    List<InMemoryMapOutput<Text, Text>> mapOutputs2 =
+        new ArrayList<InMemoryMapOutput<Text, Text>>();
+    mapOutputs2.add(mapOutput3);
+    mapOutputs2.add(mapOutput4);
+
+    inMemoryMerger2.merge(mapOutputs2);
+
+    Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
+
+    List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
+    Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator();
     List<String> keys = new ArrayList<String>();
     List<String> values = new ArrayList<String>();
-    readOnDiskMapOutput(conf, fs, outPath, keys, values);
-    Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot"));
-    Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious"));
+    while (iterator.hasNext()) {
+      CompressAwarePath next = iterator.next();
+      readOnDiskMapOutput(conf, fs, next, keys, values);
+      paths.add(next);
+    }
+    Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
+    Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious"));
+    mergeManager.close();
+
+    mergeManager = new MergeManagerImpl<Text, Text>(
+        reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
+        null, null, new Progress(), new MROutputFiles());
+
+    MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger();
+    onDiskMerger.merge(paths);
+
+    Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size());
+
+    keys = new ArrayList<String>();
+    values = new ArrayList<String>();
+    readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
+    Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
+    Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
 
     mergeManager.close();
     Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size());
     Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size());
     Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size());
   }
-  
+
   private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues)
       throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -152,11 +224,13 @@ public class TestMerger {
     writer.close();
     return baos.toByteArray();
   }
-  
+
   private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
       List<String> keys, List<String> values) throws IOException {
-    IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs,
-        path, null, null);
+    FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+
+    IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
+        fs.getFileStatus(path).getLen(), null, null);
     DataInputBuffer keyBuff = new DataInputBuffer();
     DataInputBuffer valueBuff = new DataInputBuffer();
     Text key = new Text();
@@ -169,17 +243,17 @@ public class TestMerger {
       values.add(value.toString());
     }
   }
-  
+
   @Test
   public void testCompressed() throws IOException {
     testMergeShouldReturnProperProgress(getCompressedSegments());
-  }
-  
+}
+
   @Test
   public void testUncompressed() throws IOException {
     testMergeShouldReturnProperProgress(getUncompressedSegments());
   }
-  
+
   @SuppressWarnings( { "deprecation", "unchecked" })
   public void testMergeShouldReturnProperProgress(
       List<Segment<Text, Text>> segments) throws IOException {
@@ -212,7 +286,7 @@ public class TestMerger {
     }
     return segments;
   }
-  
+
   private List<Segment<Text, Text>> getCompressedSegments() throws IOException {
     List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>();
     for (int i = 1; i < 1; i++) {
@@ -220,7 +294,7 @@ public class TestMerger {
     }
     return segments;
   }
-  
+
   private Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
     return new Segment<Text, Text>(getReader(i), false);
   }
@@ -258,7 +332,7 @@ public class TestMerger {
       }
     };
   }
-  
+
   private Answer<?> getValueAnswer(final String segmentName) {
     return new Answer<Void>() {
       int i = 0;

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Thu Aug 21 21:55:57 2014
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,7 +44,7 @@ public class TestIFile {
     DefaultCodec codec = new GzipCodec();
     codec.setConf(conf);
     IFile.Writer<Text, Text> writer =
-      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+      new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
                                    codec, null);
     writer.close();
   }
@@ -56,12 +58,15 @@ public class TestIFile {
     Path path = new Path(new Path("build/test.ifile"), "data");
     DefaultCodec codec = new GzipCodec();
     codec.setConf(conf);
+    FSDataOutputStream out = rfs.create(path);
     IFile.Writer<Text, Text> writer =
-        new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+        new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
                                      codec, null);
     writer.close();
+    FSDataInputStream in = rfs.open(path);
     IFile.Reader<Text, Text> reader =
-      new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
+      new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
+          codec, null);
     reader.close();
     
     // test check sum 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java Thu Aug 21 21:55:57 2014
@@ -80,7 +80,7 @@ public class TestReduceTask extends Test
     FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
     Path path = new Path(tmpDir, "data.in");
     IFile.Writer<Text, Text> writer = 
-      new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
+      new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
                                    codec, null);
     for(Pair p: vals) {
       writer.append(new Text(p.key), new Text(p.value));

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Thu Aug 21 21:55:57 2014
@@ -95,9 +95,9 @@ public class TestPipeApplication {
               new Counters.Counter(), new Progress());
       FileSystem fs = new RawLocalFileSystem();
       fs.setConf(conf);
-      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
-              new Path(workSpace + File.separator + "outfile"), IntWritable.class,
-              Text.class, null, null);
+      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
+              new Path(workSpace + File.separator + "outfile")), IntWritable.class,
+              Text.class, null, null, true);
       output.setWriter(wr);
       // stub for client
       File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub");
@@ -177,9 +177,9 @@ public class TestPipeApplication {
               new Progress());
       FileSystem fs = new RawLocalFileSystem();
       fs.setConf(conf);
-      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs,
-              new Path(workSpace.getAbsolutePath() + File.separator + "outfile"),
-              IntWritable.class, Text.class, null, null);
+      Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
+              new Path(workSpace.getAbsolutePath() + File.separator + "outfile")),
+              IntWritable.class, Text.class, null, null, true);
       output.setWriter(wr);
       conf.set(Submitter.PRESERVE_COMMANDFILE, "true");
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1619608&r1=1619607&r2=1619608&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Aug 21 21:55:57 2014
@@ -84,13 +84,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ApplicationClassLoader;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
@@ -242,8 +242,7 @@ public class TestMRJobs {
       // to test AM loading user classes such as output format class, we want
       // to blacklist them from the system classes (they need to be prepended
       // as the first match wins)
-      String systemClasses =
-          sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+      String systemClasses = ApplicationClassLoader.DEFAULT_SYSTEM_CLASSES;
       // exclude the custom classes from system classes
       systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
           CustomSpeculator.class.getName() + "," +