You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/01/29 17:20:32 UTC

[hadoop] branch branch-3.2 updated: MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed Hussein (ahussein)

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f62c472  MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by  Ahmed Hussein (ahussein)
f62c472 is described below

commit f62c472944b2e6488c0c99f5c049acf726f01469
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Wed Jan 29 16:54:45 2020 +0000

    MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by  Ahmed Hussein (ahussein)
    
    (cherry picked from commit b897f6834ba69d443c3acd1fab52261c00d675a1)
---
 .../apache/hadoop/mapreduce/v2/hs/JobHistory.java  |  29 ++-
 .../mapred/TestMRIntermediateDataEncryption.java   | 267 +++++++++++++--------
 2 files changed, 180 insertions(+), 116 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index a7d1370..7bb4b52 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -143,29 +143,32 @@ public class JobHistory extends AbstractService implements HistoryContext {
   protected int getInitDelaySecs() {
     return 30;
   }
-  
+
   @Override
   protected void serviceStop() throws Exception {
     LOG.info("Stopping JobHistory");
     if (scheduledExecutor != null) {
       LOG.info("Stopping History Cleaner/Move To Done");
       scheduledExecutor.shutdown();
-      boolean interrupted = false;
-      long currentTime = System.currentTimeMillis();
-      while (!scheduledExecutor.isShutdown()
-          && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
-        try {
-          Thread.sleep(20);
-        } catch (InterruptedException e) {
-          interrupted = true;
+      int retryCnt = 50;
+      try {
+        while (!scheduledExecutor.awaitTermination(20,
+            TimeUnit.MILLISECONDS)) {
+          if (--retryCnt == 0) {
+            scheduledExecutor.shutdownNow();
+            break;
+          }
         }
-      }
-      if (!scheduledExecutor.isShutdown()) {
+      } catch (InterruptedException iex) {
         LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
-        		"succeeded, Forcing a shutdown");
-        scheduledExecutor.shutdownNow();
+            "succeeded, Forcing a shutdown", iex);
+        if (!scheduledExecutor.isShutdown()) {
+          scheduledExecutor.shutdownNow();
+        }
       }
+      scheduledExecutor = null;
     }
+    // Stop the other services.
     if (storage != null && storage instanceof Service) {
       ((Service) storage).stop();
     }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
index 28b2295..fa8dacf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.util.Arrays;
+import java.util.Collection;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -26,12 +28,20 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
 
@@ -44,85 +54,126 @@ import static org.junit.Assert.*;
  * framework's merge on the reduce side will merge the partitions created to
  * generate the final output which is sorted on the key.
  */
+@RunWith(Parameterized.class)
 public class TestMRIntermediateDataEncryption {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
+  /**
+   * Use urandom to avoid the YarnChild  process from hanging on low entropy
+   * systems.
+   */
+  private static final String JVM_SECURITY_EGD_OPT =
+      "-Djava.security.egd=file:/dev/./urandom";
   // Where MR job's input will reside.
   private static final Path INPUT_DIR = new Path("/test/input");
   // Where output goes.
   private static final Path OUTPUT = new Path("/test/output");
+  private static final int NUM_LINES = 1000;
+  private static MiniMRClientCluster mrCluster = null;
+  private static MiniDFSCluster dfsCluster = null;
+  private static FileSystem fs = null;
+  private static final int NUM_NODES = 2;
 
-  @Test
-  public void testSingleReducer() throws Exception {
-    doEncryptionTest(3, 1, 2, false);
-  }
+  private final String testTitle;
+  private final int numMappers;
+  private final int numReducers;
+  private final boolean isUber;
 
-  @Test
-  public void testUberMode() throws Exception {
-    doEncryptionTest(3, 1, 2, true);
+  /**
+   * List of arguments to run the JunitTest.
+   * @return
+   */
+  @Parameterized.Parameters(
+      name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
+          + "mappers:{1}, reducers:{2}, isUber:{3})")
+  public static Collection<Object[]> getTestParameters() {
+    return Arrays.asList(new Object[][]{
+        {"testSingleReducer", 3, 1, false},
+        {"testUberMode", 3, 1, true},
+        {"testMultipleMapsPerNode", 8, 1, false},
+        {"testMultipleReducers", 2, 4, false}
+    });
   }
 
-  @Test
-  public void testMultipleMapsPerNode() throws Exception {
-    doEncryptionTest(8, 1, 2, false);
+  /**
+   * Initialized the parametrized JUnit test.
+   * @param testName the name of the unit test to be executed.
+   * @param mappers number of mappers in the tests.
+   * @param reducers number of the reducers.
+   * @param uberEnabled boolean flag for isUber
+   */
+  public TestMRIntermediateDataEncryption(String testName, int mappers,
+      int reducers, boolean uberEnabled) {
+    this.testTitle = testName;
+    this.numMappers = mappers;
+    this.numReducers = reducers;
+    this.isUber = uberEnabled;
   }
 
-  @Test
-  public void testMultipleReducers() throws Exception {
-    doEncryptionTest(2, 4, 2, false);
-  }
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+
+    // Set the jvm arguments.
+    conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        JVM_SECURITY_EGD_OPT);
+    final String childJVMOpts = JVM_SECURITY_EGD_OPT
+        + " " + conf.get("mapred.child.java.opts", " ");
+    conf.set("mapred.child.java.opts", childJVMOpts);
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
-                               boolean isUber) throws Exception {
-    doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
+
+    // Start the mini-MR and mini-DFS clusters.
+    dfsCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_NODES).build();
+    mrCluster =
+        MiniMRClientClusterFactory.create(
+            TestMRIntermediateDataEncryption.class, NUM_NODES, conf);
+    mrCluster.start();
   }
 
-  public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
-                               int numLines, boolean isUber) throws Exception {
-    MiniDFSCluster dfsCluster = null;
-    MiniMRClientCluster mrCluster = null;
-    FileSystem fileSystem = null;
-    try {
-      Configuration conf = new Configuration();
-      // Start the mini-MR and mini-DFS clusters
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (fs != null) {
+      fs.close();
+    }
+    if (mrCluster != null) {
+      mrCluster.stop();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+  }
 
-      dfsCluster = new MiniDFSCluster.Builder(conf)
-          .numDataNodes(numNodes).build();
-      fileSystem = dfsCluster.getFileSystem();
-      mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
-                                                 numNodes, conf);
-      // Generate input.
-      createInput(fileSystem, numMappers, numLines);
-      // Run the test.
-      runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
-              numMappers, numReducers, numLines, isUber);
-    } finally {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-      }
-      if (mrCluster != null) {
-        mrCluster.stop();
-      }
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle);
+    fs = dfsCluster.getFileSystem();
+    if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
+      throw new IOException("Could not delete " + INPUT_DIR);
+    }
+    if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
+      throw new IOException("Could not delete " + OUTPUT);
     }
+    // Generate input.
+    createInput(fs, numMappers, NUM_LINES);
   }
 
-  private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception {
-    fs.delete(INPUT_DIR, true);
-    for (int i = 0; i < numMappers; i++) {
-      OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
-      Writer writer = new OutputStreamWriter(os);
-      for (int j = 0; j < numLines; j++) {
-        // Create sorted key, value pairs.
-        int k = j + 1;
-        String formattedNumber = String.format("%09d", k);
-        writer.write(formattedNumber + " " + formattedNumber + "\n");
+  @After
+  public void cleanup() throws IOException {
+    if (fs != null) {
+      if (fs.exists(OUTPUT)) {
+        fs.delete(OUTPUT, true);
+      }
+      if (fs.exists(INPUT_DIR)) {
+        fs.delete(INPUT_DIR, true);
       }
-      writer.close();
     }
   }
 
-  private void runMergeTest(JobConf job, FileSystem fileSystem, int
-          numMappers, int numReducers, int numLines, boolean isUber)
-          throws Exception {
-    fileSystem.delete(OUTPUT, true);
+  @Test(timeout=600000)
+  public void testMerge() throws Exception {
+    JobConf job = new JobConf(mrCluster.getConfig());
     job.setJobName("Test");
     JobClient client = new JobClient(job);
     RunningJob submittedJob = null;
@@ -134,43 +185,53 @@ public class TestMRIntermediateDataEncryption {
     job.setMapOutputValueClass(Text.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
-    job.setMapperClass(MyMapper.class);
-    job.setPartitionerClass(MyPartitioner.class);
+    job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class);
+    job.setPartitionerClass(
+        TestMRIntermediateDataEncryption.MyPartitioner.class);
     job.setOutputFormat(TextOutputFormat.class);
     job.setNumReduceTasks(numReducers);
-
     job.setInt("mapreduce.map.maxattempts", 1);
     job.setInt("mapreduce.reduce.maxattempts", 1);
-    job.setInt("mapred.test.num_lines", numLines);
-    if (isUber) {
-      job.setBoolean("mapreduce.job.ubertask.enable", true);
-    }
+    job.setInt("mapred.test.num_lines", NUM_LINES);
+    job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
     job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
-    try {
-      submittedJob = client.submitJob(job);
-      try {
-        if (! client.monitorAndPrintJob(job, submittedJob)) {
-          throw new IOException("Job failed!");
-        }
-      } catch(InterruptedException ie) {
-        Thread.currentThread().interrupt();
+    submittedJob = client.submitJob(job);
+    submittedJob.waitForCompletion();
+    assertTrue("The submitted job is completed", submittedJob.isComplete());
+    assertTrue("The submitted job is successful", submittedJob.isSuccessful());
+    verifyOutput(fs, numMappers, NUM_LINES);
+    client.close();
+    // wait for short period to cool down.
+    Thread.sleep(1000);
+  }
+
+  private void createInput(FileSystem filesystem, int mappers, int numLines)
+      throws Exception {
+    for (int i = 0; i < mappers; i++) {
+      OutputStream os =
+          filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+      Writer writer = new OutputStreamWriter(os);
+      for (int j = 0; j < numLines; j++) {
+        // Create sorted key, value pairs.
+        int k = j + 1;
+        String formattedNumber = String.format("%09d", k);
+        writer.write(formattedNumber + " " + formattedNumber + "\n");
       }
-    } catch(IOException ioe) {
-      System.err.println("Job failed with: " + ioe);
-    } finally {
-      verifyOutput(submittedJob, fileSystem, numMappers, numLines);
+      writer.close();
+      os.close();
     }
   }
 
-  private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
-    throws Exception {
+  private void verifyOutput(FileSystem fileSystem,
+      int mappers, int numLines)
+      throws Exception {
     FSDataInputStream dis = null;
     long numValidRecords = 0;
     long numInvalidRecords = 0;
     String prevKeyValue = "000000000";
     Path[] fileList =
-      FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
-          new Utils.OutputFileUtils.OutputFilesFilter()));
+        FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+            new Utils.OutputFileUtils.OutputFilesFilter()));
     for (Path outFile : fileList) {
       try {
         dis = fileSystem.open(outFile);
@@ -197,7 +258,7 @@ public class TestMRIntermediateDataEncryption {
       }
     }
     // Make sure we got all input records in the output in sorted order.
-    assertEquals((long)(numMappers * numLines), numValidRecords);
+    assertEquals((long)(mappers * numLines), numValidRecords);
     // Make sure there is no extraneous invalid record.
     assertEquals(0, numInvalidRecords);
   }
@@ -207,29 +268,29 @@ public class TestMRIntermediateDataEncryption {
    * in displayable form.
    */
   public static class MyMapper extends MapReduceBase
-    implements Mapper<LongWritable, Text, Text, Text> {
-      private Text keyText;
-      private Text valueText;
+      implements Mapper<LongWritable, Text, Text, Text> {
+    private Text keyText;
+    private Text valueText;
 
-      public MyMapper() {
-        keyText = new Text();
-        valueText = new Text();
-      }
+    public MyMapper() {
+      keyText = new Text();
+      valueText = new Text();
+    }
 
-      @Override
-      public void map(LongWritable key, Text value,
-                      OutputCollector<Text, Text> output,
-                      Reporter reporter) throws IOException {
-        String record = value.toString();
-        int blankPos = record.indexOf(" ");
-        keyText.set(record.substring(0, blankPos));
-        valueText.set(record.substring(blankPos+1));
-        output.collect(keyText, valueText);
-      }
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, Text> output,
+        Reporter reporter) throws IOException {
+      String record = value.toString();
+      int blankPos = record.indexOf(" ");
+      keyText.set(record.substring(0, blankPos));
+      valueText.set(record.substring(blankPos + 1));
+      output.collect(keyText, valueText);
+    }
 
-      public void close() throws IOException {
-      }
+    public void close() throws IOException {
     }
+  }
 
   /**
    * Partitioner implementation to make sure that output is in total sorted
@@ -255,12 +316,12 @@ public class TestMRIntermediateDataEncryption {
       int keyValue = 0;
       try {
         keyValue = Integer.parseInt(key.toString());
-      } catch(NumberFormatException nfe) {
+      } catch (NumberFormatException nfe) {
         keyValue = 0;
       }
-      int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000);
+      int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job
+          .getInt("mapred.test.num_lines", 10000);
       return partitionNumber;
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org