You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/01/29 17:20:32 UTC
[hadoop] branch branch-3.2 updated: MAPREDUCE-7079:
JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed
Hussein (ahussein)
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f62c472 MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed Hussein (ahussein)
f62c472 is described below
commit f62c472944b2e6488c0c99f5c049acf726f01469
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Wed Jan 29 16:54:45 2020 +0000
MAPREDUCE-7079: JobHistory#ServiceStop implementation is incorrect. Contributed by Ahmed Hussein (ahussein)
(cherry picked from commit b897f6834ba69d443c3acd1fab52261c00d675a1)
---
.../apache/hadoop/mapreduce/v2/hs/JobHistory.java | 29 ++-
.../mapred/TestMRIntermediateDataEncryption.java | 267 +++++++++++++--------
2 files changed, 180 insertions(+), 116 deletions(-)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index a7d1370..7bb4b52 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -143,29 +143,32 @@ public class JobHistory extends AbstractService implements HistoryContext {
protected int getInitDelaySecs() {
return 30;
}
-
+
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping JobHistory");
if (scheduledExecutor != null) {
LOG.info("Stopping History Cleaner/Move To Done");
scheduledExecutor.shutdown();
- boolean interrupted = false;
- long currentTime = System.currentTimeMillis();
- while (!scheduledExecutor.isShutdown()
- && System.currentTimeMillis() > currentTime + 1000l && !interrupted) {
- try {
- Thread.sleep(20);
- } catch (InterruptedException e) {
- interrupted = true;
+ int retryCnt = 50;
+ try {
+ while (!scheduledExecutor.awaitTermination(20,
+ TimeUnit.MILLISECONDS)) {
+ if (--retryCnt == 0) {
+ scheduledExecutor.shutdownNow();
+ break;
+ }
}
- }
- if (!scheduledExecutor.isShutdown()) {
+ } catch (InterruptedException iex) {
LOG.warn("HistoryCleanerService/move to done shutdown may not have " +
- "succeeded, Forcing a shutdown");
- scheduledExecutor.shutdownNow();
+ "succeeded, Forcing a shutdown", iex);
+ if (!scheduledExecutor.isShutdown()) {
+ scheduledExecutor.shutdownNow();
+ }
}
+ scheduledExecutor = null;
}
+ // Stop the other services.
if (storage != null && storage instanceof Service) {
((Service) storage).stop();
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
index 28b2295..fa8dacf 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -26,12 +28,20 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
@@ -44,85 +54,126 @@ import static org.junit.Assert.*;
* framework's merge on the reduce side will merge the partitions created to
* generate the final output which is sorted on the key.
*/
+@RunWith(Parameterized.class)
public class TestMRIntermediateDataEncryption {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
+ /**
+ * Use urandom to avoid the YarnChild process from hanging on low entropy
+ * systems.
+ */
+ private static final String JVM_SECURITY_EGD_OPT =
+ "-Djava.security.egd=file:/dev/./urandom";
// Where MR job's input will reside.
private static final Path INPUT_DIR = new Path("/test/input");
// Where output goes.
private static final Path OUTPUT = new Path("/test/output");
+ private static final int NUM_LINES = 1000;
+ private static MiniMRClientCluster mrCluster = null;
+ private static MiniDFSCluster dfsCluster = null;
+ private static FileSystem fs = null;
+ private static final int NUM_NODES = 2;
- @Test
- public void testSingleReducer() throws Exception {
- doEncryptionTest(3, 1, 2, false);
- }
+ private final String testTitle;
+ private final int numMappers;
+ private final int numReducers;
+ private final boolean isUber;
- @Test
- public void testUberMode() throws Exception {
- doEncryptionTest(3, 1, 2, true);
+ /**
+ * List of arguments to run the JunitTest.
+ * @return
+ */
+ @Parameterized.Parameters(
+ name = "{index}: TestMRIntermediateDataEncryption.{0} .. "
+ + "mappers:{1}, reducers:{2}, isUber:{3})")
+ public static Collection<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][]{
+ {"testSingleReducer", 3, 1, false},
+ {"testUberMode", 3, 1, true},
+ {"testMultipleMapsPerNode", 8, 1, false},
+ {"testMultipleReducers", 2, 4, false}
+ });
}
- @Test
- public void testMultipleMapsPerNode() throws Exception {
- doEncryptionTest(8, 1, 2, false);
+ /**
+ * Initialized the parametrized JUnit test.
+ * @param testName the name of the unit test to be executed.
+ * @param mappers number of mappers in the tests.
+ * @param reducers number of the reducers.
+ * @param uberEnabled boolean flag for isUber
+ */
+ public TestMRIntermediateDataEncryption(String testName, int mappers,
+ int reducers, boolean uberEnabled) {
+ this.testTitle = testName;
+ this.numMappers = mappers;
+ this.numReducers = reducers;
+ this.isUber = uberEnabled;
}
- @Test
- public void testMultipleReducers() throws Exception {
- doEncryptionTest(2, 4, 2, false);
- }
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
+
+ // Set the jvm arguments.
+ conf.set(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ JVM_SECURITY_EGD_OPT);
+ final String childJVMOpts = JVM_SECURITY_EGD_OPT
+ + " " + conf.get("mapred.child.java.opts", " ");
+ conf.set("mapred.child.java.opts", childJVMOpts);
- public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
- boolean isUber) throws Exception {
- doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
+
+ // Start the mini-MR and mini-DFS clusters.
+ dfsCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(NUM_NODES).build();
+ mrCluster =
+ MiniMRClientClusterFactory.create(
+ TestMRIntermediateDataEncryption.class, NUM_NODES, conf);
+ mrCluster.start();
}
- public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
- int numLines, boolean isUber) throws Exception {
- MiniDFSCluster dfsCluster = null;
- MiniMRClientCluster mrCluster = null;
- FileSystem fileSystem = null;
- try {
- Configuration conf = new Configuration();
- // Start the mini-MR and mini-DFS clusters
+ @AfterClass
+ public static void tearDown() throws IOException {
+ if (fs != null) {
+ fs.close();
+ }
+ if (mrCluster != null) {
+ mrCluster.stop();
+ }
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
- dfsCluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numNodes).build();
- fileSystem = dfsCluster.getFileSystem();
- mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
- numNodes, conf);
- // Generate input.
- createInput(fileSystem, numMappers, numLines);
- // Run the test.
- runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
- numMappers, numReducers, numLines, isUber);
- } finally {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- }
- if (mrCluster != null) {
- mrCluster.stop();
- }
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", testTitle);
+ fs = dfsCluster.getFileSystem();
+ if (fs.exists(INPUT_DIR) && !fs.delete(INPUT_DIR, true)) {
+ throw new IOException("Could not delete " + INPUT_DIR);
+ }
+ if (fs.exists(OUTPUT) && !fs.delete(OUTPUT, true)) {
+ throw new IOException("Could not delete " + OUTPUT);
}
+ // Generate input.
+ createInput(fs, numMappers, NUM_LINES);
}
- private void createInput(FileSystem fs, int numMappers, int numLines) throws Exception {
- fs.delete(INPUT_DIR, true);
- for (int i = 0; i < numMappers; i++) {
- OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
- Writer writer = new OutputStreamWriter(os);
- for (int j = 0; j < numLines; j++) {
- // Create sorted key, value pairs.
- int k = j + 1;
- String formattedNumber = String.format("%09d", k);
- writer.write(formattedNumber + " " + formattedNumber + "\n");
+ @After
+ public void cleanup() throws IOException {
+ if (fs != null) {
+ if (fs.exists(OUTPUT)) {
+ fs.delete(OUTPUT, true);
+ }
+ if (fs.exists(INPUT_DIR)) {
+ fs.delete(INPUT_DIR, true);
}
- writer.close();
}
}
- private void runMergeTest(JobConf job, FileSystem fileSystem, int
- numMappers, int numReducers, int numLines, boolean isUber)
- throws Exception {
- fileSystem.delete(OUTPUT, true);
+ @Test(timeout=600000)
+ public void testMerge() throws Exception {
+ JobConf job = new JobConf(mrCluster.getConfig());
job.setJobName("Test");
JobClient client = new JobClient(job);
RunningJob submittedJob = null;
@@ -134,43 +185,53 @@ public class TestMRIntermediateDataEncryption {
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
- job.setMapperClass(MyMapper.class);
- job.setPartitionerClass(MyPartitioner.class);
+ job.setMapperClass(TestMRIntermediateDataEncryption.MyMapper.class);
+ job.setPartitionerClass(
+ TestMRIntermediateDataEncryption.MyPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(numReducers);
-
job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1);
- job.setInt("mapred.test.num_lines", numLines);
- if (isUber) {
- job.setBoolean("mapreduce.job.ubertask.enable", true);
- }
+ job.setInt("mapred.test.num_lines", NUM_LINES);
+ job.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, isUber);
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
- try {
- submittedJob = client.submitJob(job);
- try {
- if (! client.monitorAndPrintJob(job, submittedJob)) {
- throw new IOException("Job failed!");
- }
- } catch(InterruptedException ie) {
- Thread.currentThread().interrupt();
+ submittedJob = client.submitJob(job);
+ submittedJob.waitForCompletion();
+ assertTrue("The submitted job is completed", submittedJob.isComplete());
+ assertTrue("The submitted job is successful", submittedJob.isSuccessful());
+ verifyOutput(fs, numMappers, NUM_LINES);
+ client.close();
+ // wait for short period to cool down.
+ Thread.sleep(1000);
+ }
+
+ private void createInput(FileSystem filesystem, int mappers, int numLines)
+ throws Exception {
+ for (int i = 0; i < mappers; i++) {
+ OutputStream os =
+ filesystem.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
+ Writer writer = new OutputStreamWriter(os);
+ for (int j = 0; j < numLines; j++) {
+ // Create sorted key, value pairs.
+ int k = j + 1;
+ String formattedNumber = String.format("%09d", k);
+ writer.write(formattedNumber + " " + formattedNumber + "\n");
}
- } catch(IOException ioe) {
- System.err.println("Job failed with: " + ioe);
- } finally {
- verifyOutput(submittedJob, fileSystem, numMappers, numLines);
+ writer.close();
+ os.close();
}
}
- private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem, int numMappers, int numLines)
- throws Exception {
+ private void verifyOutput(FileSystem fileSystem,
+ int mappers, int numLines)
+ throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
String prevKeyValue = "000000000";
Path[] fileList =
- FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
- new Utils.OutputFileUtils.OutputFilesFilter()));
+ FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
+ new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
@@ -197,7 +258,7 @@ public class TestMRIntermediateDataEncryption {
}
}
// Make sure we got all input records in the output in sorted order.
- assertEquals((long)(numMappers * numLines), numValidRecords);
+ assertEquals((long)(mappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
@@ -207,29 +268,29 @@ public class TestMRIntermediateDataEncryption {
* in displayable form.
*/
public static class MyMapper extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, Text> {
- private Text keyText;
- private Text valueText;
+ implements Mapper<LongWritable, Text, Text, Text> {
+ private Text keyText;
+ private Text valueText;
- public MyMapper() {
- keyText = new Text();
- valueText = new Text();
- }
+ public MyMapper() {
+ keyText = new Text();
+ valueText = new Text();
+ }
- @Override
- public void map(LongWritable key, Text value,
- OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException {
- String record = value.toString();
- int blankPos = record.indexOf(" ");
- keyText.set(record.substring(0, blankPos));
- valueText.set(record.substring(blankPos+1));
- output.collect(keyText, valueText);
- }
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, Text> output,
+ Reporter reporter) throws IOException {
+ String record = value.toString();
+ int blankPos = record.indexOf(" ");
+ keyText.set(record.substring(0, blankPos));
+ valueText.set(record.substring(blankPos + 1));
+ output.collect(keyText, valueText);
+ }
- public void close() throws IOException {
- }
+ public void close() throws IOException {
}
+ }
/**
* Partitioner implementation to make sure that output is in total sorted
@@ -255,12 +316,12 @@ public class TestMRIntermediateDataEncryption {
int keyValue = 0;
try {
keyValue = Integer.parseInt(key.toString());
- } catch(NumberFormatException nfe) {
+ } catch (NumberFormatException nfe) {
keyValue = 0;
}
- int partitionNumber = (numPartitions*(Math.max(0, keyValue-1)))/job.getInt("mapred.test.num_lines", 10000);
+ int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / job
+ .getInt("mapred.test.num_lines", 10000);
return partitionNumber;
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org