You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jb...@apache.org on 2021/03/16 19:18:11 UTC

[hadoop] branch branch-3.2 updated: MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein.

This is an automated email from the ASF dual-hosted git repository.

jbrennan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 38f86cc  MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein.
38f86cc is described below

commit 38f86cc8c72201b081a083cd32bc3a322c6d9398
Author: Jim Brennan <jb...@apache.org>
AuthorDate: Tue Mar 16 19:17:18 2021 +0000

    MAPREDUCE-7322. revisiting TestMRIntermediateDataEncryption. Contributed by Ahmed Hussein.
---
 .../java/org/apache/hadoop/util/JarFinder.java     |   5 +-
 .../hadoop/mapreduce/v2/app/TestRecovery.java      |  23 +-
 .../java/org/apache/hadoop/mapred/BackupStore.java |   5 +-
 .../java/org/apache/hadoop/mapred/MapTask.java     |  34 +-
 .../main/java/org/apache/hadoop/mapred/Merger.java |   6 +-
 .../security/IntermediateEncryptedStream.java      |  89 +++++
 .../mapreduce/security/SpillCallBackInjector.java  |  86 +++++
 .../security/SpillCallBackPathsFinder.java         | 193 ++++++++++
 .../hadoop/mapreduce/security/package-info.java    |  25 ++
 .../hadoop/mapreduce/task/reduce/Fetcher.java      |   5 +-
 .../hadoop/mapreduce/task/reduce/LocalFetcher.java |   6 +-
 .../mapreduce/task/reduce/MergeManagerImpl.java    |  14 +-
 .../mapreduce/task/reduce/OnDiskMapOutput.java     |   5 +-
 .../hadoop/mapreduce/util/MRJobConfUtil.java       |  58 +++
 .../hadoop/mapreduce/task/reduce/TestMerger.java   |  74 ++--
 .../hadoop/mapred/TestLocalJobSubmission.java      | 112 +++---
 .../mapred/TestMRIntermediateDataEncryption.java   | 327 ----------------
 .../hadoop/mapred/TestMROpportunisticMaps.java     |   5 +-
 .../java/org/apache/hadoop/mapred/TestMerge.java   |   6 +-
 .../apache/hadoop/mapreduce/RandomTextWriter.java  |  32 +-
 .../TestMRIntermediateDataEncryption.java          | 411 +++++++++++++++++++++
 21 files changed, 1078 insertions(+), 443 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
index d4b1b92..5a20913 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/JarFinder.java
@@ -165,10 +165,11 @@ public class JarFinder {
             if (!testDir.exists()) {
               testDir.mkdirs();
             }
-            File tempJar = File.createTempFile("hadoop-", "", testDir);
-            tempJar = new File(tempJar.getAbsolutePath() + ".jar");
+            File tempFile = File.createTempFile("hadoop-", "", testDir);
+            File tempJar = new File(tempFile.getAbsolutePath() + ".jar");
             createJar(baseDir, tempJar);
             tempJar.deleteOnExit();
+            tempFile.deleteOnExit();
             return tempJar.getAbsolutePath();
           }
         }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
index 86f0d3c..7ed4996 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
@@ -39,6 +39,7 @@ import java.util.Map;
 
 import java.util.concurrent.TimeoutException;
 
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent;
 import org.junit.Assert;
 
@@ -104,6 +105,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.slf4j.Logger;
@@ -113,15 +116,24 @@ import org.slf4j.LoggerFactory;
 public class TestRecovery {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
-  private static Path outputDir = new Path(new File("target", 
-      TestRecovery.class.getName()).getAbsolutePath() + 
-      Path.SEPARATOR + "out");
+
+  private static File testRootDir;
+  private static Path outputDir;
   private static String partFile = "part-r-00000";
   private Text key1 = new Text("key1");
   private Text key2 = new Text("key2");
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    // setup the test root directory
+    testRootDir =
+        GenericTestUtils.setupTestRootDir(
+            TestRecovery.class);
+    outputDir = new Path(testRootDir.getAbsolutePath(), "out");
+  }
+
   /**
    * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
    * completely disappears because of failed launch, one attempt gets killed and
@@ -599,14 +611,13 @@ public class TestRecovery {
     MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
         true, ++runCount) {
     };
-    Configuration conf = new Configuration();
+    Configuration conf =
+        MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
     conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
     conf.setBoolean("mapred.mapper.new-api", true);
     conf.setBoolean("mapred.reducer.new-api", true);
     conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
-    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
-
     // run the MR job at the first attempt
     Job jobAttempt1 = app.submit(conf);
     app.waitForState(jobAttempt1, JobState.RUNNING);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
index 94ad9e0..5bd2688 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
@@ -42,7 +42,8 @@ import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -576,7 +577,7 @@ public class BackupStore<K,V> {
       file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), 
           -1, conf);
       FSDataOutputStream out = fs.create(file);
-      out = CryptoUtils.wrapIfNecessary(conf, out);
+      out = IntermediateEncryptedStream.wrapIfNecessary(conf, out, tmp);
       return new Writer<K, V>(conf, out, null, null, null, null, true);
     }
   }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index 17461b1..fa4396d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
 import org.apache.hadoop.mapreduce.CryptoUtils;
@@ -1630,7 +1631,9 @@ public class MapTask extends Task {
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
+            partitionOut =
+                IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
+                    filename);
             writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
                                       spilledRecordsCounter);
             if (combinerRunner == null) {
@@ -1687,6 +1690,7 @@ public class MapTask extends Task {
           Path indexFilename =
               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                   * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1727,7 +1731,9 @@ public class MapTask extends Task {
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
+            partitionOut =
+                IntermediateEncryptedStream.wrapIfNecessary(job, out, false,
+                    filename);
             writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec,
                                             spilledRecordsCounter);
 
@@ -1761,6 +1767,7 @@ public class MapTask extends Task {
           Path indexFilename =
               mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                   * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          IntermediateEncryptedStream.addSpillIndexFile(indexFilename, job);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1854,15 +1861,19 @@ public class MapTask extends Task {
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       if (numSpills == 1) { //the spill is the final output
+        Path indexFileOutput =
+            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
         sameVolRename(filename[0],
             mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
         if (indexCacheList.size() == 0) {
-          sameVolRename(mapOutputFile.getSpillIndexFile(0),
-            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+          Path indexFilePath = mapOutputFile.getSpillIndexFile(0);
+          IntermediateEncryptedStream.validateSpillIndexFile(
+              indexFilePath, job);
+          sameVolRename(indexFilePath, indexFileOutput);
         } else {
-          indexCacheList.get(0).writeToFile(
-            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+          indexCacheList.get(0).writeToFile(indexFileOutput, job);
         }
+        IntermediateEncryptedStream.addSpillIndexFile(indexFileOutput, job);
         sortPhase.complete();
         return;
       }
@@ -1870,6 +1881,7 @@ public class MapTask extends Task {
       // read in paged indices
       for (int i = indexCacheList.size(); i < numSpills; ++i) {
         Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+        IntermediateEncryptedStream.validateSpillIndexFile(indexFileName, job);
         indexCacheList.add(new SpillRecord(indexFileName, job));
       }
 
@@ -1881,7 +1893,7 @@ public class MapTask extends Task {
           mapOutputFile.getOutputFileForWrite(finalOutFileSize);
       Path finalIndexFile =
           mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
-
+      IntermediateEncryptedStream.addSpillIndexFile(finalIndexFile, job);
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
       FSDataOutputStream finalPartitionOut = null;
@@ -1893,8 +1905,9 @@ public class MapTask extends Task {
         try {
           for (int i = 0; i < partitions; i++) {
             long segmentStart = finalOut.getPos();
-            finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
-                false);
+            finalPartitionOut =
+                IntermediateEncryptedStream.wrapIfNecessary(job, finalOut,
+                    false, finalOutputFile);
             Writer<K, V> writer =
               new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
             writer.close();
@@ -1957,7 +1970,8 @@ public class MapTask extends Task {
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
-          finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, false);
+          finalPartitionOut = IntermediateEncryptedStream.wrapIfNecessary(job,
+              finalOut, false, finalOutputFile);
           Writer<K, V> writer =
               new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
                                spilledRecordsCounter);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
index 16f8837..d783752 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
@@ -302,7 +303,7 @@ public class Merger {
         FSDataInputStream in = fs.open(file);
 
         in.seek(segmentOffset);
-        in = CryptoUtils.wrapIfNecessary(conf, in);
+        in = IntermediateEncryptedStream.wrapIfNecessary(conf, in, file);
         reader = new Reader<K, V>(conf, in,
             segmentLength - CryptoUtils.cryptoPadding(conf),
             codec, readsCounter);
@@ -730,7 +731,8 @@ public class Merger {
                                               approxOutputSize, conf);
 
           FSDataOutputStream out = fs.create(outputFile);
-          out = CryptoUtils.wrapIfNecessary(conf, out);
+          out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
+              outputFile);
           Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
               codec, writesCounter, true);
           writeFile(this, writer, reporter, conf);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java
new file mode 100644
index 0000000..eb14a20
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/IntermediateEncryptedStream.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+
+/**
+ * Used to wrap helpers while spilling intermediate files.
+ * Setting the {@link SpillCallBackInjector} helps in:
+ *   1- adding callbacks to capture the path of the spilled files.
+ *   2- Verifying the encryption when intermediate encryption is enabled.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class IntermediateEncryptedStream {
+
+  private static SpillCallBackInjector prevSpillCBInjector = null;
+
+  public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+      FSDataOutputStream out, Path outPath) throws IOException {
+    SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
+    return CryptoUtils.wrapIfNecessary(conf, out, true);
+  }
+
+  public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+      FSDataOutputStream out, boolean closeOutputStream,
+      Path outPath) throws IOException {
+    SpillCallBackInjector.get().writeSpillFileCB(outPath, out, conf);
+    return CryptoUtils.wrapIfNecessary(conf, out, closeOutputStream);
+  }
+
+  public static FSDataInputStream wrapIfNecessary(Configuration conf,
+      FSDataInputStream in, Path inputPath) throws IOException {
+    SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
+    return CryptoUtils.wrapIfNecessary(conf, in);
+  }
+
+  public static InputStream wrapIfNecessary(Configuration conf,
+      InputStream in, long length, Path inputPath) throws IOException {
+    SpillCallBackInjector.get().getSpillFileCB(inputPath, in, conf);
+    return CryptoUtils.wrapIfNecessary(conf, in, length);
+  }
+
+  public static void addSpillIndexFile(Path indexFilename, Configuration conf) {
+    SpillCallBackInjector.get().addSpillIndexFileCB(indexFilename, conf);
+  }
+
+  public static void validateSpillIndexFile(Path indexFilename,
+      Configuration conf) {
+    SpillCallBackInjector.get().validateSpillIndexFileCB(indexFilename, conf);
+  }
+
+  public static SpillCallBackInjector resetSpillCBInjector() {
+    return setSpillCBInjector(prevSpillCBInjector);
+  }
+
+  public synchronized static SpillCallBackInjector setSpillCBInjector(
+      SpillCallBackInjector spillInjector) {
+    prevSpillCBInjector =
+        SpillCallBackInjector.getAndSet(spillInjector);
+    return spillInjector;
+  }
+
+  private IntermediateEncryptedStream() {}
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java
new file mode 100644
index 0000000..4677fe4
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackInjector.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Used for injecting callbacks while spilling files.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class SpillCallBackInjector {
+  private static SpillCallBackInjector instance = new SpillCallBackInjector();
+  public static SpillCallBackInjector get() {
+    return instance;
+  }
+  /**
+   * Sets the global SpillFilesCBInjector to the new value, returning the old
+   * value.
+   *
+   * @param spillInjector the new implementation for the spill injector.
+   * @return the previous implementation.
+   */
+  public static SpillCallBackInjector getAndSet(
+      SpillCallBackInjector spillInjector) {
+    SpillCallBackInjector prev = instance;
+    instance = spillInjector;
+    return prev;
+  }
+
+  public void writeSpillIndexFileCB(Path path) {
+    // do nothing
+  }
+
+  public void writeSpillFileCB(Path path, FSDataOutputStream out,
+      Configuration conf) {
+    // do nothing
+  }
+
+  public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
+    // do nothing
+  }
+
+  public String getSpilledFileReport() {
+    return null;
+  }
+
+  public void handleErrorInSpillFill(Path path, Exception e) {
+    // do nothing
+  }
+
+  public void corruptSpilledFile(Path fileName) throws IOException {
+    // do nothing
+  }
+
+  public void addSpillIndexFileCB(Path path, Configuration conf) {
+    // do nothing
+  }
+
+  public void validateSpillIndexFileCB(Path path, Configuration conf) {
+    // do nothing
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java
new file mode 100644
index 0000000..7be99e5
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/SpillCallBackPathsFinder.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.security;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoStreamUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.CryptoUtils;
+
+/**
+ * An implementation class that keeps track of the spilled files.
+ */
+public class SpillCallBackPathsFinder extends SpillCallBackInjector {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SpillCallBackPathsFinder.class);
+  /**
+   * Encrypted spilled files.
+   */
+  private final Map<Path, Set<Long>> encryptedSpillFiles =
+      Collections.synchronizedMap(new ConcurrentHashMap<>());
+  /**
+   * Non-Encrypted spilled files.
+   */
+  private final Map<Path, Set<Long>> spillFiles =
+      Collections.synchronizedMap(new ConcurrentHashMap<>());
+  /**
+   * Invalid position access.
+   */
+  private final Map<Path, Set<Long>> invalidAccessMap =
+      Collections.synchronizedMap(new ConcurrentHashMap<>());
+  /**
+   * Index spill files.
+   */
+  private final Set<Path> indexSpillFiles = ConcurrentHashMap.newKeySet();
+  /**
+   * Paths that were not found in the maps.
+   */
+  private final Set<Path> negativeCache = ConcurrentHashMap.newKeySet();
+
+  protected Map<Path, Set<Long>> getFilesMap(Configuration config) {
+    if (CryptoUtils.isEncryptedSpillEnabled(config)) {
+      return encryptedSpillFiles;
+    }
+    return spillFiles;
+  }
+
+  @Override
+  public void writeSpillFileCB(Path path, FSDataOutputStream out,
+      Configuration conf) {
+    long outPos = out.getPos();
+    getFilesMap(conf)
+        .computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet())
+        .add(outPos);
+    LOG.debug("writeSpillFileCB.. path:{}; pos:{}", path, outPos);
+  }
+
+  @Override
+  public void getSpillFileCB(Path path, InputStream is, Configuration conf) {
+    if (path == null) {
+      return;
+    }
+    Set<Long> pathEntries = getFilesMap(conf).get(path);
+    if (pathEntries != null) {
+      try {
+        long isPos = CryptoStreamUtils.getInputStreamOffset(is);
+        if (pathEntries.contains(isPos)) {
+          LOG.debug("getSpillFileCB... Path {}; Pos: {}", path, isPos);
+          return;
+        }
+        invalidAccessMap
+            .computeIfAbsent(path, p -> ConcurrentHashMap.newKeySet())
+            .add(isPos);
+        LOG.debug("getSpillFileCB... access incorrect position.. "
+            + "Path {}; Pos: {}", path, isPos);
+      } catch (IOException e) {
+        LOG.error("Could not get inputStream position.. Path {}", path, e);
+        // do nothing
+      }
+      return;
+    }
+    negativeCache.add(path);
+    LOG.warn("getSpillFileCB.. Could not find spilled file .. Path: {}", path);
+  }
+
+  @Override
+  public String getSpilledFileReport() {
+    StringBuilder strBuilder =
+        new StringBuilder("\n++++++++ Spill Report ++++++++")
+            .append(dumpMapEntries("Encrypted Spilled Files",
+                encryptedSpillFiles))
+            .append(dumpMapEntries("Non-Encrypted Spilled Files",
+                spillFiles))
+            .append(dumpMapEntries("Invalid Spill Access",
+                invalidAccessMap))
+            .append("\n ----- Spilled Index Files ----- ")
+            .append(indexSpillFiles.size());
+    for (Path p : indexSpillFiles) {
+      strBuilder.append("\n\t index-path: ").append(p.toString());
+    }
+    strBuilder.append("\n ----- Negative Cache files ----- ")
+        .append(negativeCache.size());
+    for (Path p : negativeCache) {
+      strBuilder.append("\n\t path: ").append(p.toString());
+    }
+    return strBuilder.toString();
+  }
+
+  @Override
+  public void addSpillIndexFileCB(Path path, Configuration conf) {
+    if (path == null) {
+      return;
+    }
+    indexSpillFiles.add(path);
+    LOG.debug("addSpillIndexFileCB... Path: {}", path);
+  }
+
+  @Override
+  public void validateSpillIndexFileCB(Path path, Configuration conf) {
+    if (path == null) {
+      return;
+    }
+    if (indexSpillFiles.contains(path)) {
+      LOG.debug("validateSpillIndexFileCB.. Path: {}", path);
+      return;
+    }
+    LOG.warn("validateSpillIndexFileCB.. could not retrieve indexFile.. "
+        + "Path: {}", path);
+    negativeCache.add(path);
+  }
+
+  public Set<Path> getEncryptedSpilledFiles() {
+    return Collections.unmodifiableSet(encryptedSpillFiles.keySet());
+  }
+
+  /**
+   * Gets the set of path:pos of the entries that were accessed incorrectly.
+   * @return a set of string in the format of {@literal Path[Pos]}
+   */
+  public Set<String> getInvalidSpillEntries() {
+    Set<String> result = new LinkedHashSet<>();
+    for (Entry<Path, Set<Long>> spillMapEntry: invalidAccessMap.entrySet()) {
+      for (Long singleEntry : spillMapEntry.getValue()) {
+        result.add(String.format("%s[%d]",
+            spillMapEntry.getKey(), singleEntry));
+      }
+    }
+    return result;
+  }
+
+  private String dumpMapEntries(String label,
+      Map<Path, Set<Long>> entriesMap) {
+    StringBuilder strBuilder =
+        new StringBuilder(String.format("%n ----- %s ----- %d", label,
+            entriesMap.size()));
+    for (Entry<Path, Set<Long>> encryptedSpillEntry
+        : entriesMap.entrySet()) {
+      strBuilder.append(String.format("%n\t\tpath: %s",
+          encryptedSpillEntry.getKey()));
+      for (Long singlePos : encryptedSpillEntry.getValue()) {
+        strBuilder.append(String.format("%n\t\t\tentry: %d", singlePos));
+      }
+    }
+    return strBuilder.toString();
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java
new file mode 100644
index 0000000..451e6f6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+/**
+ * Helper classes for the shuffle/spill encryptions.
+ */
+package org.apache.hadoop.mapreduce.security;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 5b316c1..48396e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -512,7 +513,9 @@ class Fetcher<K,V> extends Thread {
       }
 
       InputStream is = input;
-      is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength);
+      is =
+          IntermediateEncryptedStream.wrapIfNecessary(jobConf, is,
+              compressedLength, null);
       compressedLength -= CryptoUtils.cryptoPadding(jobConf);
       decompressedLength -= CryptoUtils.cryptoPadding(jobConf);
       
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 90160cf..3ae1e74 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -36,6 +36,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SpillRecord;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,7 +153,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     FileSystem localFs = FileSystem.getLocal(job).getRaw();
     FSDataInputStream inStream = localFs.open(mapOutputFileName);
     try {
-      inStream = CryptoUtils.wrapIfNecessary(job, inStream);
+      inStream =
+          IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
+              mapOutputFileName);
       inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
       mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
           decompressedLength, metrics, reporter);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
index 58c2686..0aab49a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.mapred.Task.CombineValuesIterator;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -468,7 +468,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
                                            mergeOutputSize).suffix(
                                                Task.MERGED_OUTPUT_PREFIX);
 
-      FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+      FSDataOutputStream out =
+          IntermediateEncryptedStream.wrapIfNecessary(jobConf,
+              rfs.create(outputPath), outputPath);
       Writer<K, V> writer = new Writer<K, V>(jobConf, out,
           (Class<K>) jobConf.getMapOutputKeyClass(),
           (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
@@ -552,7 +554,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
         localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
             approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
 
-      FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
+      FSDataOutputStream out =
+          IntermediateEncryptedStream.wrapIfNecessary(jobConf,
+              rfs.create(outputPath), outputPath);
       Writer<K, V> writer = new Writer<K, V>(jobConf, out,
           (Class<K>) jobConf.getMapOutputKeyClass(),
           (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
@@ -735,7 +739,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
 
-        FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath));
+        FSDataOutputStream out =
+            IntermediateEncryptedStream.wrapIfNecessary(job,
+                fs.create(outputPath), outputPath);
         Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
             codec, null, true);
         try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
index 98d7a57..d90ec37 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.MapOutputFile;
 
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.CryptoUtils;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -84,7 +84,8 @@ class OnDiskMapOutput<K, V> extends IFileWrappedMapOutput<K, V> {
     this.fs = fs;
     this.outputPath = outputPath;
     tmpOutputPath = getTempPath(outputPath, fetcher);
-    disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath));
+    disk = IntermediateEncryptedStream.wrapIfNecessary(conf,
+        fs.create(tmpOutputPath), tmpOutputPath);
   }
 
   @VisibleForTesting
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
index 4e4e78e..4319e17 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/MRJobConfUtil.java
@@ -17,14 +17,22 @@
  */
 package org.apache.hadoop.mapreduce.util;
 
+import java.io.File;
 import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 /**
  * A class that contains utility methods for MR Job configuration.
  */
 public final class MRJobConfUtil {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MRJobConfUtil.class);
   public static final String REDACTION_REPLACEMENT_VAL = "*********(redacted)";
 
   /**
@@ -130,4 +138,54 @@ public final class MRJobConfUtil {
   public static double convertTaskProgressToFactor(final float progress) {
     return Math.floor(progress * MRJobConfUtil.PROGRESS_MIN_DELTA_FACTOR);
   }
+
+  /**
+   * For unit tests, use urandom to avoid the YarnChild  process from hanging
+   * on low entropy systems.
+   */
+  private static final String TEST_JVM_SECURITY_EGD_OPT =
+      "-Djava.security.egd=file:/dev/./urandom";
+
+  public static Configuration initEncryptedIntermediateConfigsForTesting(
+      Configuration conf) {
+    Configuration config =
+        (conf == null) ? new Configuration(): conf;
+    final String childJVMOpts =
+        TEST_JVM_SECURITY_EGD_OPT.concat(" ")
+            .concat(config.get("mapred.child.java.opts", " "));
+    // Set the jvm arguments.
+    config.set("yarn.app.mapreduce.am.admin-command-opts",
+        TEST_JVM_SECURITY_EGD_OPT);
+    config.set("mapred.child.java.opts", childJVMOpts);
+    config.setBoolean("mapreduce.job.encrypted-intermediate-data", true);
+    return config;
+  }
+
+  /**
+   * Set local directories so that the generated folders is subdirectory of the
+   * test directories.
+   * @param conf
+   * @param testRootDir
+   * @return
+   */
+  public static Configuration setLocalDirectoriesConfigForTesting(
+      Configuration conf, File testRootDir) {
+    Configuration config =
+        (conf == null) ? new Configuration(): conf;
+    final File hadoopLocalDir = new File(testRootDir, "hadoop-dir");
+    // create the directory
+    if (!hadoopLocalDir.getAbsoluteFile().mkdirs()) {
+      LOG.info("{} directory already exists", hadoopLocalDir.getPath());
+    }
+    Path mapredHadoopTempDir = new Path(hadoopLocalDir.getPath());
+    Path mapredSystemDir = new Path(mapredHadoopTempDir, "system");
+    Path stagingDir = new Path(mapredHadoopTempDir, "tmp/staging");
+    // Set the temp directories a subdir of the test directory.
+    config.set("mapreduce.jobtracker.staging.root.dir", stagingDir.toString());
+    config.set("mapreduce.jobtracker.system.dir", mapredSystemDir.toString());
+    config.set("mapreduce.cluster.temp.dir", mapredHadoopTempDir.toString());
+    config.set("mapreduce.cluster.local.dir",
+        new Path(mapredHadoopTempDir, "local").toString());
+    return config;
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
index a6b1964..732e478 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,43 +50,62 @@ import org.apache.hadoop.mapred.Merger;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.CryptoUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestMerger {
 
-  private Configuration conf;
+  private static File testRootDir;
+  @Rule
+  public TestName unitTestName = new TestName();
+  private File unitTestDir;
   private JobConf jobConf;
   private FileSystem fs;
 
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    // setup the test root directory
+    testRootDir =
+        GenericTestUtils.setupTestRootDir(
+            TestMerger.class);
+  }
+
   @Before
   public void setup() throws IOException {
-    conf = new Configuration();
+    unitTestDir = new File(testRootDir, unitTestName.getMethodName());
+    unitTestDir.mkdirs();
     jobConf = new JobConf();
-    fs = FileSystem.getLocal(conf);
+    // Set the temp directories a subdir of the test directory.
+    MRJobConfUtil.setLocalDirectoriesConfigForTesting(jobConf, unitTestDir);
+    jobConf.set(MRConfig.FRAMEWORK_NAME, "local");
+    fs = FileSystem.getLocal(jobConf);
   }
 
 
   @Test
   public void testEncryptedMerger() throws Throwable {
-    jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
-    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    // Enable intermediate encryption.
+    MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(jobConf);
     Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
     TokenCache.setEncryptedSpillKey(new byte[16], credentials);
     UserGroupInformation.getCurrentUser().addCredentials(credentials);
@@ -105,8 +125,8 @@ public class TestMerger {
     LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
     MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>(
-        reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
-        null, null, new Progress(), new MROutputFiles());
+        reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
+        null, null, null, new Progress(), new MROutputFiles());
 
     // write map outputs
     Map<String, String> map1 = new TreeMap<String, String>();
@@ -114,12 +134,12 @@ public class TestMerger {
     map1.put("carrot", "delicious");
     Map<String, String> map2 = new TreeMap<String, String>();
     map1.put("banana", "pretty good");
-    byte[] mapOutputBytes1 = writeMapOutput(conf, map1);
-    byte[] mapOutputBytes2 = writeMapOutput(conf, map2);
+    byte[] mapOutputBytes1 = writeMapOutput(jobConf, map1);
+    byte[] mapOutputBytes2 = writeMapOutput(jobConf, map2);
     InMemoryMapOutput<Text, Text> mapOutput1 = new InMemoryMapOutput<Text, Text>(
-        conf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
+        jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
     InMemoryMapOutput<Text, Text> mapOutput2 = new InMemoryMapOutput<Text, Text>(
-        conf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
+        jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
     System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0,
         mapOutputBytes1.length);
     System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0,
@@ -149,12 +169,12 @@ public class TestMerger {
     map3.put("carrot", "amazing");
     Map<String, String> map4 = new TreeMap<String, String>();
     map4.put("banana", "bla");
-    byte[] mapOutputBytes3 = writeMapOutput(conf, map3);
-    byte[] mapOutputBytes4 = writeMapOutput(conf, map4);
+    byte[] mapOutputBytes3 = writeMapOutput(jobConf, map3);
+    byte[] mapOutputBytes4 = writeMapOutput(jobConf, map4);
     InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>(
-        conf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
+        jobConf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
     InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>(
-        conf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
+        jobConf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
     System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0,
         mapOutputBytes3.length);
     System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0,
@@ -173,12 +193,13 @@ public class TestMerger {
     Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size());
 
     List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>();
-    Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator();
+    Iterator<CompressAwarePath> iterator =
+        mergeManager.onDiskMapOutputs.iterator();
     List<String> keys = new ArrayList<String>();
     List<String> values = new ArrayList<String>();
     while (iterator.hasNext()) {
       CompressAwarePath next = iterator.next();
-      readOnDiskMapOutput(conf, fs, next, keys, values);
+      readOnDiskMapOutput(jobConf, fs, next, keys, values);
       paths.add(next);
     }
     Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
@@ -186,8 +207,8 @@ public class TestMerger {
     mergeManager.close();
 
     mergeManager = new MergeManagerImpl<Text, Text>(
-        reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null,
-        null, null, new Progress(), new MROutputFiles());
+        reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null,
+        null, null, null, new Progress(), new MROutputFiles());
 
     MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger();
     onDiskMerger.merge(paths);
@@ -196,7 +217,8 @@ public class TestMerger {
 
     keys = new ArrayList<String>();
     values = new ArrayList<String>();
-    readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
+    readOnDiskMapOutput(jobConf, fs,
+        mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
     Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
     Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
 
@@ -222,7 +244,8 @@ public class TestMerger {
 
   private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path,
       List<String> keys, List<String> values) throws IOException {
-    FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path));
+    FSDataInputStream in =
+        IntermediateEncryptedStream.wrapIfNecessary(conf, fs.open(path), path);
 
     IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in,
         fs.getFileStatus(path).getLen(), null, null);
@@ -252,14 +275,15 @@ public class TestMerger {
   @SuppressWarnings( { "unchecked" })
   public void testMergeShouldReturnProperProgress(
       List<Segment<Text, Text>> segments) throws IOException {
-    Path tmpDir = new Path("localpath");
+    Path tmpDir = new Path(jobConf.get("mapreduce.cluster.temp.dir"),
+        "localpath");
     Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass();
     Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass();
     RawComparator<Text> comparator = jobConf.getOutputKeyComparator();
     Counter readsCounter = new Counter();
     Counter writesCounter = new Counter();
     Progress mergePhase = new Progress();
-    RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
+    RawKeyValueIterator mergeQueue = Merger.merge(jobConf, fs, keyClass,
         valueClass, segments, 2, tmpDir, comparator, getReporter(),
         readsCounter, writesCounter, mergePhase);
     final float epsilon = 0.00001f;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
index a3ea26e..c8b6c89 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLocalJobSubmission.java
@@ -31,8 +31,20 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
+import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
@@ -41,8 +53,39 @@ import static org.junit.Assert.*;
  * -jt local -libjars
  */
 public class TestLocalJobSubmission {
-  private static Path TEST_ROOT_DIR =
-      new Path(System.getProperty("test.build.data","/tmp"));
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestLocalJobSubmission.class);
+
+  private static File testRootDir;
+
+  @Rule
+  public TestName unitTestName = new TestName();
+  private File unitTestDir;
+  private Path jarPath;
+  private Configuration config;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    // setup the test root directory
+    testRootDir =
+        GenericTestUtils.setupTestRootDir(TestLocalJobSubmission.class);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    unitTestDir = new File(testRootDir, unitTestName.getMethodName());
+    unitTestDir.mkdirs();
+    config = createConfig();
+    jarPath = makeJar(new Path(unitTestDir.getAbsolutePath(), "test.jar"));
+  }
+
+  private Configuration createConfig() {
+    // Set the temp directories a subdir of the test directory.
+    Configuration conf =
+        MRJobConfUtil.setLocalDirectoriesConfigForTesting(null, unitTestDir);
+    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    return conf;
+  }
 
   /**
    * Test the local job submission options of -jt local -libjars.
@@ -51,12 +94,9 @@ public class TestLocalJobSubmission {
    */
   @Test
   public void testLocalJobLibjarsOption() throws IOException {
-    Configuration conf = new Configuration();
-
-    testLocalJobLibjarsOption(conf);
-
-    conf.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
-    testLocalJobLibjarsOption(conf);
+    testLocalJobLibjarsOption(config);
+    config.setBoolean(Job.USE_WILDCARD_FOR_LIBJARS, false);
+    testLocalJobLibjarsOption(config);
   }
 
   /**
@@ -67,8 +107,6 @@ public class TestLocalJobSubmission {
    */
   private void testLocalJobLibjarsOption(Configuration conf)
       throws IOException {
-    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
     conf.set(MRConfig.FRAMEWORK_NAME, "local");
     final String[] args = {
@@ -79,8 +117,7 @@ public class TestLocalJobSubmission {
     try {
       res = ToolRunner.run(conf, new SleepJob(), args);
     } catch (Exception e) {
-      System.out.println("Job failed with " + e.getLocalizedMessage());
-      e.printStackTrace(System.out);
+      LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
       fail("Job failed");
     }
     assertEquals("dist job res is not 0:", 0, res);
@@ -93,18 +130,20 @@ public class TestLocalJobSubmission {
    */
   @Test
   public void testLocalJobEncryptedIntermediateData() throws IOException {
-    Configuration conf = new Configuration();
-    conf.set(MRConfig.FRAMEWORK_NAME, "local");
-    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+    config = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(config);
     final String[] args = {
         "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
     };
     int res = -1;
     try {
-      res = ToolRunner.run(conf, new SleepJob(), args);
+      SpillCallBackPathsFinder spillInjector =
+          (SpillCallBackPathsFinder) IntermediateEncryptedStream
+              .setSpillCBInjector(new SpillCallBackPathsFinder());
+      res = ToolRunner.run(config, new SleepJob(), args);
+      Assert.assertTrue("No spill occurred",
+          spillInjector.getEncryptedSpilledFiles().size() > 0);
     } catch (Exception e) {
-      System.out.println("Job failed with " + e.getLocalizedMessage());
-      e.printStackTrace(System.out);
+      LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
       fail("Job failed");
     }
     assertEquals("dist job res is not 0:", 0, res);
@@ -116,15 +155,13 @@ public class TestLocalJobSubmission {
    */
   @Test
   public void testJobMaxMapConfig() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(MRConfig.FRAMEWORK_NAME, "local");
-    conf.setInt(MRJobConfig.JOB_MAX_MAP, 0);
+    config.setInt(MRJobConfig.JOB_MAX_MAP, 0);
     final String[] args = {
         "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
     };
     int res = -1;
     try {
-      res = ToolRunner.run(conf, new SleepJob(), args);
+      res = ToolRunner.run(config, new SleepJob(), args);
       fail("Job should fail");
     } catch (IllegalArgumentException e) {
       assertTrue(e.getLocalizedMessage().contains(
@@ -139,20 +176,16 @@ public class TestLocalJobSubmission {
    */
   @Test
   public void testLocalJobFilesOption() throws IOException {
-    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
-    conf.set(MRConfig.FRAMEWORK_NAME, "local");
-    final String[] args =
-        {"-jt", "local", "-files", jarPath.toString(), "-m", "1", "-r", "1",
-            "-mt", "1", "-rt", "1"};
+    config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
+    final String[] args = {
+        "-jt", "local", "-files", jarPath.toString(),
+        "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
+    };
     int res = -1;
     try {
-      res = ToolRunner.run(conf, new SleepJob(), args);
+      res = ToolRunner.run(config, new SleepJob(), args);
     } catch (Exception e) {
-      System.out.println("Job failed with " + e.getLocalizedMessage());
-      e.printStackTrace(System.out);
+      LOG.error("Job failed with {}", e.getLocalizedMessage(), e);
       fail("Job failed");
     }
     assertEquals("dist job res is not 0:", 0, res);
@@ -165,27 +198,22 @@ public class TestLocalJobSubmission {
    */
   @Test
   public void testLocalJobArchivesOption() throws IOException {
-    Path jarPath = makeJar(new Path(TEST_ROOT_DIR, "test.jar"));
-
-    Configuration conf = new Configuration();
-    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
-    conf.set(MRConfig.FRAMEWORK_NAME, "local");
+    config.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost:9000");
     final String[] args =
         {"-jt", "local", "-archives", jarPath.toString(), "-m", "1", "-r",
             "1", "-mt", "1", "-rt", "1"};
     int res = -1;
     try {
-      res = ToolRunner.run(conf, new SleepJob(), args);
+      res = ToolRunner.run(config, new SleepJob(), args);
     } catch (Exception e) {
-      System.out.println("Job failed with " + e.getLocalizedMessage());
-      e.printStackTrace(System.out);
+      LOG.error("Job failed with {}" + e.getLocalizedMessage(), e);
       fail("Job failed");
     }
     assertEquals("dist job res is not 0:", 0, res);
   }
 
   private Path makeJar(Path p) throws IOException {
-    FileOutputStream fos = new FileOutputStream(new File(p.toString()));
+    FileOutputStream fos = new FileOutputStream(p.toString());
     JarOutputStream jos = new JarOutputStream(fos);
     ZipEntry ze = new ZipEntry("test.jar.inside");
     jos.putNextEntry(ze);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
deleted file mode 100644
index fa8dacf..0000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings(value={"unchecked", "deprecation"})
-/**
- * This test tests the support for a merge operation in Hadoop.  The input files
- * are already sorted on the key.  This test implements an external
- * MapOutputCollector implementation that just copies the records to different
- * partitions while maintaining the sort order in each partition.  The Hadoop
- * framework's merge on the reduce side will merge the partitions created to
- * generate the final output which is sorted on the key.
- */
-@RunWith(Parameterized.class)
-public class TestMRIntermediateDataEncryption {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
-  /**
-   * Use urandom to avoid the YarnChild  process from hanging on low entropy
-   * systems.
-   */
-  private static final String JVM_SECURITY_EGD_OPT =
-      "-Djava.security.egd=file:/dev/./urandom";
-  // Where MR job's input will reside.
-  private static final Path INPUT_DIR = new Path("/test/input");
-  // Where output goes.
-  private static final Path OUTPUT = new Path("/test/output");
-  private static final int NUM_LINES = 1000;
-  private static MiniMRClientCluster mrCluster = null;
-  private static MiniDFSCluster dfsCluster = null;
-  private static FileSystem fs = null;
-  private static final int NUM_NODES = 2;
-
-  private final String testTitle;
-  private final int numMappers;
-  private final int numReducers;
-  private final boolean isUber;
-
-  /**
-   * List of arguments to run the JunitTest.
-   * @return
-   */
-  @Parameterized.Parameters(
-      name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
-          + "mappers:{1}, reducers:{2}, isUber:{3})")
-  public static Collection<Object[]> getTestParameters() {
-    return Arrays.asList(new Object[][]{
-        {"testSingleReducer", 3, 1, false},
-        {"testUberMode", 3, 1, true},
-        {"testMultipleMapsPerNode", 8, 1, false},
-        {"testMultipleReducers", 2, 4, false}
-    });
-  }
-
-  /**
-   * Initialized the parametrized JUnit test.
-   * @param testName the name of the unit test to be executed.
-   * @param mappers number of mappers in the tests.
-   * @param reducers number of the reducers.
-   * @param uberEnabled boolean flag for isUber
-   */
-  public TestMRIntermediateDataEncryption(String testName, int mappers,
-      int reducers, boolean uberEnabled) {
-    this.testTitle = testName;
-    this.numMappers = mappers;
-    this.numReducers = reducers;
-    this.isUber = uberEnabled;
-  }
-
-  @BeforeClass
-  public static void setupClass() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
-
-    // Set the jvm arguments.
-    conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
-        JVM_SECURITY_EGD_OPT);
-    final String childJVMOpts = JVM_SECURITY_EGD_OPT
-        + " " + conf.get("mapred.child.java.opts", " ");
-    conf.set("mapred.child.java.opts", childJVMOpts);
-
-
-    // Start the mini-MR and mini-DFS clusters.
-    dfsCluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(NUM_NODES).build();
-    mrCluster =
-        MiniMRClientClusterFactory.create(
-            TestMRIntermediateDataEncryption.class, NUM_NODES, conf);
-    mrCluster.start();
-  }
-
-  @AfterClass
-  public static void tearDown() throws IOException {
-    if (fs != null) {
-      fs.close();
-    }
-    if (mrCluster != null) {
-      mrCluster.stop();
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-    }
-  }
-
-  @Before
-  public void setup() throws Exception {
-    LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle);
-    fs = dfsCluster.getFileSystem();
-    if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
-      throw new IOException("Could not delete " + INPUT_DIR);
-    }
-    if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
-      throw new IOException("Could not delete " + OUTPUT);
-    }
-    // Generate input.
-    createInput(fs, numMappers, NUM_LINES);
-  }
-
-  @After
-  public void cleanup() throws IOException {
-    if (fs != null) {
-      if (fs.exists(OUTPUT)) {
-        fs.delete(OUTPUT, true);
-      }
-      if (fs.exists(INPUT_DIR)) {
-        fs.delete(INPUT_DIR, true);
-      }
-    }
-  }
-
-  @Test(timeout=600000)
-  public void testMerge() throws Exception {
-    JobConf job = new JobConf(mrCluster.getConfig());
-    job.setJobName("Test");
-    JobClient client = new JobClient(job);
-    RunningJob submittedJob = null;
-    FileInputFormat.setInputPaths(job, INPUT_DIR);
-    FileOutputFormat.setOutputPath(job, OUTPUT);
-    job.set("mapreduce.output.textoutputformat.separator", " ");
-    job.setInputFormat(TextInputFormat.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(Text.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(Text.class);
-    job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class);
-    job.setPartitionerClass(
-        TestMRIntermediateDataEncryption.MyPartitioner.class);
-    job.setOutputFormat(TextOutputFormat.class);
-    job.setNumReduceTasks(numReducers);
-    job.setInt("mapreduce.map.maxattempts", 1);
-    job.setInt("mapreduce.reduce.maxattempts", 1);
-    job.setInt("mapred.test.num_lines", NUM_LINES);
-    job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
-    job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
-    submittedJob = client.submitJob(job);
-    submittedJob.waitForCompletion();
-    assertTrue("The submitted job is completed", submittedJob.isComplete());
-    assertTrue("The submitted job is successful", submittedJob.isSuccessful());
-    verifyOutput(fs, numMappers, NUM_LINES);
-    client.close();
-    // wait for short period to cool down.
-    Thread.sleep(1000);
-  }
-
-  private void createInput(FileSystem filesystem, int mappers, int numLines)
-      throws Exception {
-    for (int i = 0; i < mappers; i++) {
-      OutputStream os =
-          filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
-      Writer writer = new OutputStreamWriter(os);
-      for (int j = 0; j < numLines; j++) {
-        // Create sorted key, value pairs.
-        int k = j + 1;
-        String formattedNumber = String.format("%09d", k);
-        writer.write(formattedNumber + " " + formattedNumber + "\n");
-      }
-      writer.close();
-      os.close();
-    }
-  }
-
-  private void verifyOutput(FileSystem fileSystem,
-      int mappers, int numLines)
-      throws Exception {
-    FSDataInputStream dis = null;
-    long numValidRecords = 0;
-    long numInvalidRecords = 0;
-    String prevKeyValue = "000000000";
-    Path[] fileList =
-        FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
-            new Utils.OutputFileUtils.OutputFilesFilter()));
-    for (Path outFile : fileList) {
-      try {
-        dis = fileSystem.open(outFile);
-        String record;
-        while((record = dis.readLine()) != null) {
-          // Split the line into key and value.
-          int blankPos = record.indexOf(" ");
-          String keyString = record.substring(0, blankPos);
-          String valueString = record.substring(blankPos+1);
-          // Check for sorted output and correctness of record.
-          if (keyString.compareTo(prevKeyValue) >= 0
-              && keyString.equals(valueString)) {
-            prevKeyValue = keyString;
-            numValidRecords++;
-          } else {
-            numInvalidRecords++;
-          }
-        }
-      } finally {
-        if (dis != null) {
-          dis.close();
-          dis = null;
-        }
-      }
-    }
-    // Make sure we got all input records in the output in sorted order.
-    assertEquals((long)(mappers * numLines), numValidRecords);
-    // Make sure there is no extraneous invalid record.
-    assertEquals(0, numInvalidRecords);
-  }
-
-  /**
-   * A mapper implementation that assumes that key text contains valid integers
-   * in displayable form.
-   */
-  public static class MyMapper extends MapReduceBase
-      implements Mapper<LongWritable, Text, Text, Text> {
-    private Text keyText;
-    private Text valueText;
-
-    public MyMapper() {
-      keyText = new Text();
-      valueText = new Text();
-    }
-
-    @Override
-    public void map(LongWritable key, Text value,
-        OutputCollector<Text, Text> output,
-        Reporter reporter) throws IOException {
-      String record = value.toString();
-      int blankPos = record.indexOf(" ");
-      keyText.set(record.substring(0, blankPos));
-      valueText.set(record.substring(blankPos + 1));
-      output.collect(keyText, valueText);
-    }
-
-    public void close() throws IOException {
-    }
-  }
-
-  /**
-   * Partitioner implementation to make sure that output is in total sorted
-   * order.  We basically route key ranges to different reducers such that
-   * key values monotonically increase with the partition number.  For example,
-   * in this test, the keys are numbers from 1 to 1000 in the form "000000001"
-   * to "000001000" in each input file.  The keys "000000001" to "000000250" are
-   * routed to partition 0, "000000251" to "000000500" are routed to partition 1
-   * and so on since we have 4 reducers.
-   */
-  static class MyPartitioner implements Partitioner<Text, Text> {
-
-    private JobConf job;
-
-    public MyPartitioner() {
-    }
-
-    public void configure(JobConf job) {
-      this.job = job;
-    }
-
-    public int getPartition(Text key, Text value, int numPartitions) {
-      int keyValue = 0;
-      try {
-        keyValue = Integer.parseInt(key.toString());
-      } catch (NumberFormatException nfe) {
-        keyValue = 0;
-      }
-      int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job
-          .getInt("mapred.test.num_lines", 10000);
-      return partitionNumber;
-    }
-  }
-}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
index eed731f..c2a9663 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Test;
 
@@ -79,7 +80,8 @@ public class TestMROpportunisticMaps {
     MiniMRClientCluster mrCluster = null;
     FileSystem fileSystem = null;
     try {
-      Configuration conf = new Configuration();
+      Configuration conf =
+          MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
       // Start the mini-MR and mini-DFS clusters
       conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
       conf.setBoolean(YarnConfiguration.
@@ -149,7 +151,6 @@ public class TestMROpportunisticMaps {
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
     job.setInt("mapred.test.num_lines", numLines);
-    job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
     try {
       submittedJob = client.submitJob(job);
       try {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
index a9e7f64..b8a16e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMerge.java
@@ -87,12 +87,12 @@ public class TestMerge {
       // Run the test.
       runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem);
     } finally {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-      }
       if (mrCluster != null) {
         mrCluster.stop();
       }
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
     }
   }
 
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java
index 0bf30c8..dca39df 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/RandomTextWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -99,6 +98,15 @@ public class RandomTextWriter extends Configured implements Tool {
    */
   enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
 
+  public static String generateSentenceWithRand(ThreadLocalRandom rand,
+      int noWords) {
+    StringBuffer sentence = new StringBuffer(words[rand.nextInt(words.length)]);
+    for (int i = 1; i < noWords; i++) {
+      sentence.append(" ").append(words[rand.nextInt(words.length)]);
+    }
+    return sentence.toString();
+  }
+
   static class RandomTextMapper extends Mapper<Text, Text, Text, Text> {
     
     private long numBytesToWrite;
@@ -106,7 +114,6 @@ public class RandomTextWriter extends Configured implements Tool {
     private int wordsInKeyRange;
     private int minWordsInValue;
     private int wordsInValueRange;
-    private Random random = new Random();
     
     /**
      * Save the configuration value that we need to write the data.
@@ -127,12 +134,13 @@ public class RandomTextWriter extends Configured implements Tool {
     public void map(Text key, Text value,
                     Context context) throws IOException,InterruptedException {
       int itemCount = 0;
+      ThreadLocalRandom rand = ThreadLocalRandom.current();
       while (numBytesToWrite > 0) {
         // Generate the key/value 
-        int noWordsKey = minWordsInKey + 
-          (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
-        int noWordsValue = minWordsInValue + 
-          (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
+        int noWordsKey = minWordsInKey +
+            (wordsInKeyRange != 0 ? rand.nextInt(wordsInKeyRange) : 0);
+        int noWordsValue = minWordsInValue +
+            (wordsInValueRange != 0 ? rand.nextInt(wordsInValueRange) : 0);
         Text keyWords = generateSentence(noWordsKey);
         Text valueWords = generateSentence(noWordsValue);
         
@@ -154,13 +162,9 @@ public class RandomTextWriter extends Configured implements Tool {
     }
     
     private Text generateSentence(int noWords) {
-      StringBuffer sentence = new StringBuffer();
-      String space = " ";
-      for (int i=0; i < noWords; ++i) {
-        sentence.append(words[random.nextInt(words.length)]);
-        sentence.append(space);
-      }
-      return new Text(sentence.toString());
+      String sentence =
+          generateSentenceWithRand(ThreadLocalRandom.current(), noWords);
+      return new Text(sentence);
     }
   }
   
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
new file mode 100644
index 0000000..79fcd41
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MiniMRClientCluster;
+import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
+import org.apache.hadoop.mapred.Utils;
+
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
+import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
+import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
+import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This class tests the support of Intermediate data encryption
+ * (Spill data encryption).
+ * It starts by generating random input text file ({@link RandomTextWriter})
+ * using the {@link ToolRunner}.
+ * A wordCount job consumes the generated input. The final job is configured in
+ * a way to guarantee that data is spilled.
+ * mbs-per-map specifies the amount of data (in MBs) to generate per map.
+ * By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
+ * <code>map-tasks</code> specifies the number of map tasks to run.
+ */
+@RunWith(Parameterized.class)
+public class TestMRIntermediateDataEncryption {
+  /**
+   * The number of bytes generated by the input generator.
+   */
+  public static final long TOTAL_MBS_DEFAULT = 128L;
+  public static final long BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024L;
+  public static final int INPUT_GEN_NUM_THREADS = 16;
+  public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
+  public static final String JOB_DIR_PATH = "jobs-data-path";
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
+  /**
+   * Directory of the test data.
+   */
+  private static File testRootDir;
+  private static volatile BufferedWriter inputBufferedWriter;
+  private static Configuration commonConfig;
+  private static MiniDFSCluster dfsCluster;
+  private static MiniMRClientCluster mrCluster;
+  private static FileSystem fs;
+  private static Path jobInputDirPath;
+  private static long inputFileSize;
+  /**
+   * Test parameters.
+   */
+  private final String testTitleName;
+  private final int numMappers;
+  private final int numReducers;
+  private final boolean isUber;
+  private Configuration config;
+  private Path jobOutputPath;
+
+  /**
+   * Initialized the parametrized JUnit test.
+   * @param testName the name of the unit test to be executed.
+   * @param mappers number of mappers in the tests.
+   * @param reducers number of the reducers.
+   * @param uberEnabled boolean flag for isUber
+   */
+  public TestMRIntermediateDataEncryption(String testName, int mappers,
+      int reducers, boolean uberEnabled) {
+    this.testTitleName = testName;
+    this.numMappers = mappers;
+    this.numReducers = reducers;
+    this.isUber = uberEnabled;
+  }
+
+  /**
+   * List of arguments to run the JunitTest.
+   * @return
+   */
+  @Parameterized.Parameters(
+      name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
+          + "mappers:{1}, reducers:{2}, isUber:{3})")
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][]{
+        {"testSingleReducer", 3, 1, false},
+        {"testUberMode", 3, 1, true},
+        {"testMultipleMapsPerNode", 8, 1, false},
+        // TODO: The following configuration is commented out until
+        //       MAPREDUCE-7325 is fixed.
+        //       Setting multiple reducers breaks LocalJobRunner causing the
+        //       unit test to fail.
+        // {"testMultipleReducers", 2, 4, false}
+    });
+  }
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    // setup the test root directory
+    testRootDir =
+        GenericTestUtils.setupTestRootDir(
+            TestMRIntermediateDataEncryption.class);
+    // setup the base configurations and the clusters
+    final File dfsFolder = new File(testRootDir, "dfs");
+    final Path jobsDirPath = new Path(JOB_DIR_PATH);
+
+    commonConfig = createBaseConfiguration();
+    dfsCluster =
+        new MiniDFSCluster.Builder(commonConfig, dfsFolder)
+            .numDataNodes(2).build();
+    dfsCluster.waitActive();
+    mrCluster = MiniMRClientClusterFactory.create(
+        TestMRIntermediateDataEncryption.class, 2, commonConfig);
+    mrCluster.start();
+    fs = dfsCluster.getFileSystem();
+    if (fs.exists(jobsDirPath) && !fs.delete(jobsDirPath, true)) {
+      throw new IOException("Could not delete JobsDirPath" + jobsDirPath);
+    }
+    fs.mkdirs(jobsDirPath);
+    jobInputDirPath = new Path(jobsDirPath, "in-dir");
+    // run the input generator job.
+    Assert.assertEquals("Generating input should succeed", 0,
+        generateInputTextFile());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    // shutdown clusters
+    if (mrCluster != null) {
+      mrCluster.stop();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+    // make sure that generated input file is deleted
+    final File textInputFile = new File(testRootDir, "input.txt");
+    if (textInputFile.exists()) {
+      textInputFile.delete();
+    }
+  }
+
+  /**
+   * Creates a configuration object setting the common properties before
+   * initializing the clusters.
+   * @return configuration to be used as a base for the unit tests.
+   */
+  private static Configuration createBaseConfiguration() {
+    // Set the jvm arguments to enable intermediate encryption.
+    Configuration conf =
+        MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
+    // Set the temp directories a subdir of the test directory.
+    conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
+    conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
+    return conf;
+  }
+
+  /**
+   * Creates a thread safe BufferedWriter to be used among the task generators.
+   * @return A synchronized <code>BufferedWriter</code> to the input file.
+   * @throws IOException
+   */
+  private static synchronized BufferedWriter getTextInputWriter()
+      throws IOException {
+    if (inputBufferedWriter == null) {
+      final File textInputFile = new File(testRootDir, "input.txt");
+      inputBufferedWriter = new BufferedWriter(new FileWriter(textInputFile));
+    }
+    return inputBufferedWriter;
+  }
+
+  /**
+   * Generates input text file of size <code>TOTAL_MBS_DEFAULT</code>.
+   * It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
+   *
+   * @return the result of the input generation. 0 for success.
+   * @throws Exception
+   */
+  private static int generateInputTextFile() throws Exception {
+    final File textInputFile = new File(testRootDir, "input.txt");
+    final AtomicLong actualWrittenBytes = new AtomicLong(0);
+    // create INPUT_GEN_NUM_THREADS callables
+    final ExecutorService executor =
+        Executors.newFixedThreadPool(INPUT_GEN_NUM_THREADS);
+    //create a list to hold the Future object associated with Callable
+    final List<Future<Long>> inputGenerators = new ArrayList<>();
+    final Callable<Long> callableGen = new InputGeneratorTask();
+    final long startTime = Time.monotonicNow();
+    for (int i = 0; i < INPUT_GEN_NUM_THREADS; i++) {
+      //submit Callable tasks to be executed by thread pool
+      Future<Long> genFutureTask = executor.submit(callableGen);
+      inputGenerators.add(genFutureTask);
+    }
+    for (Future<Long> genFutureTask : inputGenerators) {
+      // print the return value of Future, notice the output delay in console
+      // because Future.get() waits for task to get completed
+      LOG.info("Received one task. Current total bytes: {}",
+          actualWrittenBytes.addAndGet(genFutureTask.get()));
+    }
+    getTextInputWriter().close();
+    final long endTime = Time.monotonicNow();
+    LOG.info("Finished generating input. Wrote {} bytes in {} seconds",
+        actualWrittenBytes.get(), ((endTime - startTime) * 1.0) / 1000);
+    executor.shutdown();
+    // copy text file to HDFS deleting the source.
+    fs.mkdirs(jobInputDirPath);
+    Path textInputPath =
+        fs.makeQualified(new Path(jobInputDirPath, "input.txt"));
+    fs.copyFromLocalFile(true, new Path(textInputFile.getAbsolutePath()),
+        textInputPath);
+    if (!fs.exists(textInputPath)) {
+      // the file was not generated. Fail.
+      return 1;
+    }
+    // update the input size.
+    FileStatus[] fileStatus =
+        fs.listStatus(textInputPath);
+    inputFileSize = fileStatus[0].getLen();
+    LOG.info("Text input file; path: {}, size: {}",
+        textInputPath, inputFileSize);
+    return 0;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
+        testTitleName);
+    final Path jobDirPath = new Path(JOB_DIR_PATH, testTitleName);
+    if (fs.exists(jobDirPath) && !fs.delete(jobDirPath, true)) {
+      throw new IOException("Could not delete " + jobDirPath);
+    }
+    fs.mkdirs(jobDirPath);
+    jobOutputPath = new Path(jobDirPath, "out-dir");
+    // Set the configuration for the job.
+    config = new Configuration(commonConfig);
+    config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
+    config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
+    // set the configuration to make sure that we get spilled files
+    long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
+    config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
+    long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
+        MRJobConfig.DEFAULT_MAP_MEMORY_MB));
+    // make sure the map tasks will spill to disk.
+    config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
+    config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
+    config.setInt(MRJobConfig.NUM_MAPS, numMappers);
+    // max attempts have to be set to 1 when intermediate encryption is enabled.
+    config.setInt("mapreduce.map.maxattempts", 1);
+    config.setInt("mapreduce.reduce.maxattempts", 1);
+  }
+
+  @Test
+  public void testWordCount() throws Exception {
+    LOG.info("........Starting main Job Driver #{} starting at {}.......",
+        testTitleName, Time.formatTime(System.currentTimeMillis()));
+    Job job = Job.getInstance(config);
+    job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
+    job.setJarByClass(TestMRIntermediateDataEncryption.class);
+    job.setJobName("mr-spill-" + testTitleName);
+    // Mapper configuration
+    job.setMapperClass(TokenizerMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setCombinerClass(LongSumReducer.class);
+    FileInputFormat.setMinInputSplitSize(job,
+        (inputFileSize + numMappers) / numMappers);
+    // Reducer configuration
+    job.setReducerClass(LongSumReducer.class);
+    job.setNumReduceTasks(numReducers);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+    // Set the IO paths for the job.
+    FileInputFormat.addInputPath(job, jobInputDirPath);
+    FileOutputFormat.setOutputPath(job, jobOutputPath);
+    SpillCallBackPathsFinder spillInjector =
+        (SpillCallBackPathsFinder) IntermediateEncryptedStream
+            .setSpillCBInjector(new SpillCallBackPathsFinder());
+    StringBuilder testSummary =
+        new StringBuilder(String.format("%n ===== test %s summary ======",
+            testTitleName));
+    try {
+      long startTime = Time.monotonicNow();
+      testSummary.append(String.format("%nJob %s ended at %s",
+          testTitleName, Time.formatTime(System.currentTimeMillis())));
+      Assert.assertTrue(job.waitForCompletion(true));
+      long endTime = Time.monotonicNow();
+      testSummary.append(String.format("%nJob %s ended at %s",
+              job.getJobName(), Time.formatTime(System.currentTimeMillis())));
+      testSummary.append(String.format("%n\tThe job took %.3f seconds",
+          (1.0 * (endTime - startTime)) / 1000));
+      long spilledRecords =
+          job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
+      Assert.assertFalse(
+          "The encrypted spilled files should not be empty.",
+          spillInjector.getEncryptedSpilledFiles().isEmpty());
+      Assert.assertTrue("Spill records must be greater than 0",
+          spilledRecords > 0);
+      Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
+          fs.exists(jobOutputPath));
+      Assert.assertTrue("Invalid access to spill file positions",
+          spillInjector.getInvalidSpillEntries().isEmpty());
+      FileStatus[] fileStatus =
+          fs.listStatus(jobOutputPath,
+              new Utils.OutputFileUtils.OutputFilesFilter());
+      for (FileStatus fStatus : fileStatus) {
+        long fileSize = fStatus.getLen();
+        testSummary.append(
+            String.format("%n\tOutput file %s: %d",
+                fStatus.getPath(), fileSize));
+      }
+    } finally {
+      testSummary.append(spillInjector.getSpilledFileReport());
+      LOG.info(testSummary.toString());
+      IntermediateEncryptedStream.resetSpillCBInjector();
+    }
+  }
+
+  /**
+   * A callable implementation that generates a portion of the
+   * <code>TOTAL_MBS_DEFAULT</code> into {@link #inputBufferedWriter}.
+   */
+  static class InputGeneratorTask implements Callable<Long> {
+    @Override
+    public Long call() throws Exception {
+      long bytesWritten = 0;
+      final ThreadLocalRandom rand = ThreadLocalRandom.current();
+      final long totalBytes = 1024 * 1024 * TOTAL_MBS_DEFAULT;
+      final long bytesPerTask = totalBytes / INPUT_GEN_NUM_THREADS;
+      final String newLine = System.lineSeparator();
+      final BufferedWriter writer = getTextInputWriter();
+      while (bytesWritten < bytesPerTask) {
+        String sentence =
+            RandomTextWriter.generateSentenceWithRand(rand, rand.nextInt(5, 20))
+                .concat(newLine);
+        writer.write(sentence);
+        bytesWritten += sentence.length();
+      }
+      writer.flush();
+      LOG.info("Task {} finished. Wrote {} bytes.",
+          Thread.currentThread().getName(), bytesWritten);
+      return bytesWritten;
+    }
+  }
+
+  /**
+   * A Test tokenizer Mapper.
+   */
+  public static class TokenizerMapper
+      extends Mapper<Object, Text, Text, LongWritable> {
+
+    private final static LongWritable ONE = new LongWritable(1);
+    private final Text word = new Text();
+
+    public void map(Object key, Text value,
+        Context context) throws IOException, InterruptedException {
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        context.write(word, ONE);
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org