You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/22 20:31:45 UTC
svn commit: r1505766 - in /hadoop/common/branches/branch-1-win:
CHANGES.branch-1-win.txt src/mapred/org/apache/hadoop/mapred/TaskLog.java
src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Author: cnauroth
Date: Mon Jul 22 18:31:45 2013
New Revision: 1505766
URL: http://svn.apache.org/r1505766
Log:
MAPREDUCE-5405. Merging change r1505760 from branch-1 to branch-1-win.
Modified:
hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1505766&r1=1505765&r2=1505766&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Mon Jul 22 18:31:45 2013
@@ -463,3 +463,6 @@ Branch-hadoop-1-win (branched from branc
HDFS-3794. WebHDFS Open used with Offset returns the original (and incorrect)
Content Length in the HTTP Header. (Tsz Wo (Nicholas), SZE via cnauroth)
+
+ MAPREDUCE-5405. Job recovery can fail if task log directory symlink from
+ prior run still exists. (cnauroth)
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1505766&r1=1505765&r2=1505766&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java Mon Jul 22 18:31:45 2013
@@ -115,6 +115,16 @@ public class TaskLog {
String strLinkAttemptLogDir =
getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar +
taskID.toString() + cleanupSuffix;
+ // If the job is recovered, then the symlink might still exist from the prior
+ // run. Symlink creation fails if it already exists, so attempt to delete
+ // first.
+ File linkAttemptLogDir = new File(strLinkAttemptLogDir);
+ if (linkAttemptLogDir.exists()) {
+ if (!linkAttemptLogDir.delete()) {
+ LOG.warn("Failed to delete existing file at path " +
+ strLinkAttemptLogDir);
+ }
+ }
if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
throw new IOException("Creation of symlink from " +
strLinkAttemptLogDir + " to " + strAttemptLogDir +
Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1505766&r1=1505765&r2=1505766&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Jul 22 18:31:45 2013
@@ -52,6 +52,9 @@ public class TestRecoveryManager {
private static final Path TEST_DIR =
new Path(System.getProperty("test.build.data", "/tmp"),
"test-recovery-manager");
+ private static final long AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS = 5000;
+ private static final long AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS = 100;
+ private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
private FileSystem fs;
private MiniDFSCluster dfs;
@@ -174,7 +177,7 @@ public class TestRecoveryManager {
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
// Signaling Map task to complete
- fs.create(new Path(TEST_DIR, "signal"));
+ fs.create(new Path(TEST_DIR, "signal")).close();
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -253,12 +256,12 @@ public class TestRecoveryManager {
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
// Signaling Map task to complete
- fs.create(new Path(TEST_DIR, "signal"));
+ fs.create(new Path(TEST_DIR, "signal")).close();
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
- Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job1.get("mapreduce.job.dir"))));
- Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job2.get("mapreduce.job.dir"))));
+ Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job1));
+ Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job2));
}
public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
@@ -356,7 +359,7 @@ public class TestRecoveryManager {
job1.setJobPriority(JobPriority.HIGH);
UtilsForTests.configureWaitingJobConf(job1,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 30, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the faulty job
@@ -374,7 +377,7 @@ public class TestRecoveryManager {
String signalFile1 = new Path(TEST_DIR, "signal1").toString();
UtilsForTests.configureWaitingJobConf(job2,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 20, 0,
"test-recovery-manager", signalFile1, signalFile1);
// submit the job
@@ -395,7 +398,7 @@ public class TestRecoveryManager {
UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
UtilsForTests.configureWaitingJobConf(job3,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
+ new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 1, 0,
"test-recovery-manager", signalFile, signalFile);
// submit the job
@@ -453,7 +456,7 @@ public class TestRecoveryManager {
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
// Signaling Map task to complete
- fs.create(new Path(TEST_DIR, "signal1"));
+ fs.create(new Path(TEST_DIR, "signal1")).close();
UtilsForTests.waitFor(100);
}
Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -488,7 +491,7 @@ public class TestRecoveryManager {
job1.setJobPriority(JobPriority.HIGH);
UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
- new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
+ new Path(TEST_DIR, "output8"), 30, 0, "test-restart", signalFile,
signalFile);
// submit the faulty job
@@ -528,7 +531,7 @@ public class TestRecoveryManager {
JobConf job2 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
- new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
+ new Path(TEST_DIR, "output9"), 50, 0, "test-restart-manager",
signalFile, signalFile);
// submit a new job
@@ -658,7 +661,7 @@ public class TestRecoveryManager {
final JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
- new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+ new Path(HDFS_TEST_DIR, "output10"), 2, 0, "test-resubmission", signalFile,
signalFile);
UserGroupInformation ugi =
@@ -718,7 +721,7 @@ public class TestRecoveryManager {
JobInProgress jip = jobtracker.getJob(rJob1.getID());
// Signaling Map task to complete
- fs.create(new Path(HDFS_TEST_DIR, "signal"));
+ fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
UtilsForTests.waitFor(100);
@@ -764,7 +767,7 @@ public class TestRecoveryManager {
final JobConf job1 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
- new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+ new Path(HDFS_TEST_DIR, "output11"), 2, 0, "test-resubmission", signalFile,
signalFile);
UserGroupInformation ugi =
@@ -805,7 +808,7 @@ public class TestRecoveryManager {
JobInProgress jip = jobtracker.getJob(rJob1.getID());
// Signaling Map task to complete
- fs.create(new Path(HDFS_TEST_DIR, "signal"));
+ fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
while (!jip.isComplete()) {
LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
UtilsForTests.waitFor(100);
@@ -813,4 +816,31 @@ public class TestRecoveryManager {
rJob1 = jc.getJob(rJob1.getID());
Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
}
+
+ /**
+ * Awaits completion of job cleanup, which runs asynchronously on a background
+ * thread. Job cleanup removes the job directory. This method periodically
+ * polls for the existence of that directory, waiting up to a maximum time for
+ * the cleanup to complete.
+ *
+ * @param jobConf JobConf to inspect
+ * @return boolean true if job cleanup detected, or false if job cleanup was
+ * not detected within the maximum wait time
+ * @throws IOException if there is an I/O error checking the directory
+ * @throws InterruptedException if thread interrupted while sleeping
+ */
+ private boolean awaitJobCleanup(JobConf jobConf) throws IOException,
+ InterruptedException {
+ Path jobDir = new Path(jobConf.get("mapreduce.job.dir"));
+ long start = System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+ long elapsed = 0;
+ do {
+ if (!fs.exists(jobDir)) {
+ return true;
+ }
+ Thread.sleep(AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS);
+ elapsed = (System.nanoTime() / NANOSECONDS_PER_MILLISECOND) - start;
+ } while (elapsed < AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS);
+ return false;
+ }
}