You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jb...@apache.org on 2021/03/22 19:46:12 UTC

[hadoop] branch branch-3.1 updated: MAPREDUCE-7325. Intermediate data encryption is broken in LocalJobRunner. Contributed by Ahmed Hussein

This is an automated email from the ASF dual-hosted git repository.

jbrennan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c1b2542  MAPREDUCE-7325. Intermediate data encryption is broken in LocalJobRunner. Contributed by Ahmed Hussein
c1b2542 is described below

commit c1b25422a52f23db21cd467471bf612073b414e3
Author: Jim Brennan <jb...@apache.org>
AuthorDate: Mon Mar 22 18:41:25 2021 +0000

    MAPREDUCE-7325. Intermediate data encryption is broken in LocalJobRunner. Contributed by Ahmed Hussein
    
    (cherry picked from commit ede490d13182f0fd3831afe6db97e01917add5b4)
---
 .../hadoop/mapreduce/task/reduce/LocalFetcher.java |   2 +-
 .../TestMRIntermediateDataEncryption.java          | 216 ++++++++++++++++-----
 2 files changed, 170 insertions(+), 48 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 3ae1e74..dc563ee 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -153,10 +153,10 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
     FileSystem localFs = FileSystem.getLocal(job).getRaw();
     FSDataInputStream inStream = localFs.open(mapOutputFileName);
     try {
+      inStream.seek(ir.startOffset);
       inStream =
           IntermediateEncryptedStream.wrapIfNecessary(job, inStream,
               mapOutputFileName);
-      inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
       mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
           decompressedLength, metrics, reporter);
     } finally {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
index 79fcd41..fbee7ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRIntermediateDataEncryption.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,9 +76,20 @@ import org.apache.hadoop.util.ToolRunner;
  * mbs-per-map specifies the amount of data (in MBs) to generate per map.
  * By default, this is twice the value of <code>mapreduce.task.io.sort.mb</code>
  * <code>map-tasks</code> specifies the number of map tasks to run.
+ * Steps of the unit test:
+ * 1- Generating random input text.
+ * 2- Run a job with encryption disabled. Get the checksum of the output file
+ *    <code>checkSumReference</code>.
+ * 3- Run the job with encryption enabled.
+ * 4- Compare <code>checkSumReference</code> to the checksum of the job output.
+ * 5- If the job has multiple reducers, the test launches one final job to
+ *    combine the output files into a single one.
+ * 6- Verify that the maps spilled files.
  */
 @RunWith(Parameterized.class)
 public class TestMRIntermediateDataEncryption {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
   /**
    * The number of bytes generated by the input generator.
    */
@@ -86,8 +98,6 @@ public class TestMRIntermediateDataEncryption {
   public static final int INPUT_GEN_NUM_THREADS = 16;
   public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
   public static final String JOB_DIR_PATH = "jobs-data-path";
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
   /**
    * Directory of the test data.
    */
@@ -97,6 +107,7 @@ public class TestMRIntermediateDataEncryption {
   private static MiniDFSCluster dfsCluster;
   private static MiniMRClientCluster mrCluster;
   private static FileSystem fs;
+  private static FileChecksum checkSumReference;
   private static Path jobInputDirPath;
   private static long inputFileSize;
   /**
@@ -136,11 +147,7 @@ public class TestMRIntermediateDataEncryption {
         {"testSingleReducer", 3, 1, false},
         {"testUberMode", 3, 1, true},
         {"testMultipleMapsPerNode", 8, 1, false},
-        // TODO: The following configuration is commented out until
-        //       MAPREDUCE-7325 is fixed.
-        //       Setting multiple reducers breaks LocalJobRunner causing the
-        //       unit test to fail.
-        // {"testMultipleReducers", 2, 4, false}
+        {"testMultipleReducers", 2, 4, false}
     });
   }
 
@@ -171,6 +178,8 @@ public class TestMRIntermediateDataEncryption {
     // run the input generator job.
     Assert.assertEquals("Generating input should succeed", 0,
         generateInputTextFile());
+    // run the reference job
+    runReferenceJob();
   }
 
   @AfterClass
@@ -185,7 +194,7 @@ public class TestMRIntermediateDataEncryption {
     // make sure that generated input file is deleted
     final File textInputFile = new File(testRootDir, "input.txt");
     if (textInputFile.exists()) {
-      textInputFile.delete();
+      Assert.assertTrue(textInputFile.delete());
     }
   }
 
@@ -198,7 +207,7 @@ public class TestMRIntermediateDataEncryption {
     // Set the jvm arguments to enable intermediate encryption.
     Configuration conf =
         MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
-    // Set the temp directories a subdir of the test directory.
+    // Set the temp directories a subDir of the test directory.
     conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting(conf, testRootDir);
     conf.setLong("dfs.blocksize", BLOCK_SIZE_DEFAULT);
     return conf;
@@ -207,7 +216,7 @@ public class TestMRIntermediateDataEncryption {
   /**
    * Creates a thread safe BufferedWriter to be used among the task generators.
    * @return A synchronized <code>BufferedWriter</code> to the input file.
-   * @throws IOException
+   * @throws IOException opening a new {@link FileWriter}.
    */
   private static synchronized BufferedWriter getTextInputWriter()
       throws IOException {
@@ -223,7 +232,7 @@ public class TestMRIntermediateDataEncryption {
    * It creates a total <code>INPUT_GEN_NUM_THREADS</code> future tasks.
    *
    * @return the result of the input generation. 0 for success.
-   * @throws Exception
+   * @throws Exception during the I/O of job.
    */
   private static int generateInputTextFile() throws Exception {
     final File textInputFile = new File(testRootDir, "input.txt");
@@ -270,6 +279,118 @@ public class TestMRIntermediateDataEncryption {
     return 0;
   }
 
+  /**
+   * Runs a WordCount job with encryption disabled and stores the checksum of
+   * the output file.
+   * @throws Exception due to I/O errors.
+   */
+  private static void runReferenceJob() throws Exception {
+    final String jobRefLabel = "job-reference";
+    final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
+    if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
+      throw new IOException("Could not delete " + jobRefDirPath);
+    }
+    Assert.assertTrue(fs.mkdirs(jobRefDirPath));
+    Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
+    Configuration referenceConf = new Configuration(commonConfig);
+    referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, false);
+    Job jobReference = runWordCountJob(jobRefLabel, jobRefOutputPath,
+        referenceConf, 4, 1);
+    Assert.assertTrue(jobReference.isSuccessful());
+    FileStatus[] fileStatusArr =
+        fs.listStatus(jobRefOutputPath,
+            new Utils.OutputFileUtils.OutputFilesFilter());
+    Assert.assertEquals(1, fileStatusArr.length);
+    checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
+    Assert.assertTrue(fs.delete(jobRefDirPath, true));
+  }
+
+  private static Job runWordCountJob(String postfixName, Path jOutputPath,
+      Configuration jConf, int mappers, int reducers) throws Exception {
+    Job job = Job.getInstance(jConf);
+    job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, mappers);
+    job.setJarByClass(TestMRIntermediateDataEncryption.class);
+    job.setJobName("mr-spill-" + postfixName);
+    // Mapper configuration
+    job.setMapperClass(TokenizerMapper.class);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setCombinerClass(LongSumReducer.class);
+    FileInputFormat.setMinInputSplitSize(job,
+        (inputFileSize + mappers) / mappers);
+    // Reducer configuration
+    job.setReducerClass(LongSumReducer.class);
+    job.setNumReduceTasks(reducers);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+    // Set the IO paths for the job.
+    FileInputFormat.addInputPath(job, jobInputDirPath);
+    FileOutputFormat.setOutputPath(job, jOutputPath);
+    if (job.waitForCompletion(true)) {
+      FileStatus[] fileStatusArr =
+          fs.listStatus(jOutputPath,
+              new Utils.OutputFileUtils.OutputFilesFilter());
+      for (FileStatus fStatus : fileStatusArr) {
+        LOG.info("Job: {} .. Output file {} .. Size = {}",
+            postfixName, fStatus.getPath(), fStatus.getLen());
+      }
+    }
+    return job;
+  }
+
+  /**
+   * Compares the checksum of the output file to the
+   * <code>checkSumReference</code>.
+   * If the job has a multiple reducers, the output files are combined by
+   * launching another job.
+   * @return true if the checksums are equal.
+   * @throws Exception if the output is missing or the combiner job fails.
+   */
+  private boolean validateJobOutput() throws Exception {
+    Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
+        fs.exists(jobOutputPath));
+    Path outputPath = jobOutputPath;
+    if (numReducers != 1) {
+      // combine the result into one file by running a combiner job
+      final String jobRefLabel = testTitleName + "-combine";
+      final Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
+      if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
+        throw new IOException("Could not delete " + jobRefDirPath);
+      }
+      fs.mkdirs(jobRefDirPath);
+      outputPath = new Path(jobRefDirPath, "out-dir");
+      Configuration referenceConf = new Configuration(commonConfig);
+      referenceConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA,
+          false);
+      Job combinerJob = Job.getInstance(referenceConf);
+      combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
+      combinerJob.setJobName("mr-spill-" + jobRefLabel);
+      combinerJob.setMapperClass(CombinerJobMapper.class);
+      FileInputFormat.addInputPath(combinerJob, jobOutputPath);
+      // Reducer configuration
+      combinerJob.setReducerClass(LongSumReducer.class);
+      combinerJob.setNumReduceTasks(1);
+      combinerJob.setOutputKeyClass(Text.class);
+      combinerJob.setOutputValueClass(LongWritable.class);
+      // Set the IO paths for the job.
+      FileOutputFormat.setOutputPath(combinerJob, outputPath);
+      if (!combinerJob.waitForCompletion(true)) {
+        return false;
+      }
+      FileStatus[] fileStatusArr =
+          fs.listStatus(outputPath,
+              new Utils.OutputFileUtils.OutputFilesFilter());
+      LOG.info("Job-Combination: {} .. Output file {} .. Size = {}",
+          jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen());
+    }
+    // Get the output files of the job.
+    FileStatus[] fileStatusArr =
+        fs.listStatus(outputPath,
+            new Utils.OutputFileUtils.OutputFilesFilter());
+    FileChecksum jobFileChecksum =
+        fs.getFileChecksum(fileStatusArr[0].getPath());
+    return checkSumReference.equals(jobFileChecksum);
+  }
+
   @Before
   public void setup() throws Exception {
     LOG.info("Starting TestMRIntermediateDataEncryption#{}.......",
@@ -284,16 +405,16 @@ public class TestMRIntermediateDataEncryption {
     config = new Configuration(commonConfig);
     config.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
     config.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0F);
-    // set the configuration to make sure that we get spilled files
+    // Set the configuration to make sure that we get spilled files.
     long ioSortMb = TASK_SORT_IO_MB_DEFAULT;
     config.setLong(MRJobConfig.IO_SORT_MB, ioSortMb);
     long mapMb = Math.max(2 * ioSortMb, config.getInt(MRJobConfig.MAP_MEMORY_MB,
         MRJobConfig.DEFAULT_MAP_MEMORY_MB));
-    // make sure the map tasks will spill to disk.
+    // Make sure the map tasks will spill to disk.
     config.setLong(MRJobConfig.MAP_MEMORY_MB, mapMb);
     config.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
     config.setInt(MRJobConfig.NUM_MAPS, numMappers);
-    // max attempts have to be set to 1 when intermediate encryption is enabled.
+    // Max attempts have to be set to 1 when intermediate encryption is enabled.
     config.setInt("mapreduce.map.maxattempts", 1);
     config.setInt("mapreduce.reduce.maxattempts", 1);
   }
@@ -302,24 +423,6 @@ public class TestMRIntermediateDataEncryption {
   public void testWordCount() throws Exception {
     LOG.info("........Starting main Job Driver #{} starting at {}.......",
         testTitleName, Time.formatTime(System.currentTimeMillis()));
-    Job job = Job.getInstance(config);
-    job.getConfiguration().setInt(MRJobConfig.NUM_MAPS, numMappers);
-    job.setJarByClass(TestMRIntermediateDataEncryption.class);
-    job.setJobName("mr-spill-" + testTitleName);
-    // Mapper configuration
-    job.setMapperClass(TokenizerMapper.class);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setCombinerClass(LongSumReducer.class);
-    FileInputFormat.setMinInputSplitSize(job,
-        (inputFileSize + numMappers) / numMappers);
-    // Reducer configuration
-    job.setReducerClass(LongSumReducer.class);
-    job.setNumReduceTasks(numReducers);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(LongWritable.class);
-    // Set the IO paths for the job.
-    FileInputFormat.addInputPath(job, jobInputDirPath);
-    FileOutputFormat.setOutputPath(job, jobOutputPath);
     SpillCallBackPathsFinder spillInjector =
         (SpillCallBackPathsFinder) IntermediateEncryptedStream
             .setSpillCBInjector(new SpillCallBackPathsFinder());
@@ -328,34 +431,36 @@ public class TestMRIntermediateDataEncryption {
             testTitleName));
     try {
       long startTime = Time.monotonicNow();
-      testSummary.append(String.format("%nJob %s ended at %s",
+      testSummary.append(String.format("%nJob %s started at %s",
           testTitleName, Time.formatTime(System.currentTimeMillis())));
-      Assert.assertTrue(job.waitForCompletion(true));
+      Job job = runWordCountJob(testTitleName, jobOutputPath, config,
+          numMappers, numReducers);
+      Assert.assertTrue(job.isSuccessful());
       long endTime = Time.monotonicNow();
       testSummary.append(String.format("%nJob %s ended at %s",
               job.getJobName(), Time.formatTime(System.currentTimeMillis())));
       testSummary.append(String.format("%n\tThe job took %.3f seconds",
           (1.0 * (endTime - startTime)) / 1000));
-      long spilledRecords =
-          job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
-      Assert.assertFalse(
-          "The encrypted spilled files should not be empty.",
-          spillInjector.getEncryptedSpilledFiles().isEmpty());
-      Assert.assertTrue("Spill records must be greater than 0",
-          spilledRecords > 0);
-      Assert.assertTrue("Job Output path [" + jobOutputPath + "] should exist",
-          fs.exists(jobOutputPath));
-      Assert.assertTrue("Invalid access to spill file positions",
-          spillInjector.getInvalidSpillEntries().isEmpty());
-      FileStatus[] fileStatus =
+      FileStatus[] fileStatusArr =
           fs.listStatus(jobOutputPath,
               new Utils.OutputFileUtils.OutputFilesFilter());
-      for (FileStatus fStatus : fileStatus) {
+      for (FileStatus fStatus : fileStatusArr) {
         long fileSize = fStatus.getLen();
         testSummary.append(
             String.format("%n\tOutput file %s: %d",
                 fStatus.getPath(), fileSize));
       }
+      // Validate the checksum of the output.
+      Assert.assertTrue(validateJobOutput());
+      // Check intermediate files and spilling.
+      long spilledRecords =
+          job.getCounters().findCounter(TaskCounter.SPILLED_RECORDS).getValue();
+      Assert.assertTrue("Spill records must be greater than 0",
+          spilledRecords > 0);
+      Assert.assertFalse("The encrypted spilled files should not be empty.",
+          spillInjector.getEncryptedSpilledFiles().isEmpty());
+      Assert.assertTrue("Invalid access to spill file positions",
+          spillInjector.getInvalidSpillEntries().isEmpty());
     } finally {
       testSummary.append(spillInjector.getSpilledFileReport());
       LOG.info(testSummary.toString());
@@ -408,4 +513,21 @@ public class TestMRIntermediateDataEncryption {
       }
     }
   }
+
+  /**
+   * A Mapper that reads the output of WordCount passing it to the reducer.
+   * It is used to combine the output of multiple reducer jobs.
+   */
+  public static class CombinerJobMapper
+      extends Mapper<Object, Text, Text, LongWritable> {
+    private final LongWritable sum = new LongWritable(0);
+    private final Text word = new Text();
+    public void map(Object key, Text value,
+        Context context) throws IOException, InterruptedException {
+      String[] line = value.toString().split("\\s+");
+      sum.set(Long.parseLong(line[1]));
+      word.set(line[0]);
+      context.write(word, sum);
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org