You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/07/22 20:21:23 UTC

svn commit: r1505760 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskLog.java src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Author: cnauroth
Date: Mon Jul 22 18:21:22 2013
New Revision: 1505760

URL: http://svn.apache.org/r1505760
Log:
MAPREDUCE-5405. Job recovery can fail if task log directory symlink from prior run still exists. Contributed by Chris Nauroth.

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1505760&r1=1505759&r2=1505760&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Jul 22 18:21:22 2013
@@ -92,6 +92,9 @@ Release 1.3.0 - unreleased
     HDFS-3794. WebHDFS Open used with Offset returns the original (and incorrect)
     Content Length in the HTTP Header. (Tsz Wo (Nicholas), SZE via cnauroth)
 
+    MAPREDUCE-5405. Job recovery can fail if task log directory symlink from
+    prior run still exists. (cnauroth)
+
 Release 1.2.1 - 2013.07.06
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1505760&r1=1505759&r2=1505760&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskLog.java Mon Jul 22 18:21:22 2013
@@ -106,6 +106,16 @@ public class TaskLog {
     String strLinkAttemptLogDir = 
         getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + 
         taskID.toString() + cleanupSuffix;
+    // If the job is recovered, then the symlink might still exist from the prior
+    // run.  Symlink creation fails if it already exists, so attempt to delete
+    // first.
+    File linkAttemptLogDir = new File(strLinkAttemptLogDir);
+    if (linkAttemptLogDir.exists()) {
+      if (!linkAttemptLogDir.delete()) {
+        LOG.warn("Failed to delete existing file at path " +
+          strLinkAttemptLogDir);
+      }
+    }
     if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) {
       throw new IOException("Creation of symlink from " + 
                             strLinkAttemptLogDir + " to " + strAttemptLogDir +

Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1505760&r1=1505759&r2=1505760&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Jul 22 18:21:22 2013
@@ -52,6 +52,9 @@ public class TestRecoveryManager {
   private static final Path TEST_DIR = 
     new Path(System.getProperty("test.build.data", "/tmp"), 
              "test-recovery-manager");
+  private static final long AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS = 5000;
+  private static final long AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS = 100;
+  private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
   
   private FileSystem fs;
   private MiniDFSCluster dfs;
@@ -174,7 +177,7 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal"));
+      fs.create(new Path(TEST_DIR, "signal")).close();
       UtilsForTests.waitFor(100);
     }
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -253,12 +256,12 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal"));
+      fs.create(new Path(TEST_DIR, "signal")).close();
       UtilsForTests.waitFor(100);
     }
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
-    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job1.get("mapreduce.job.dir"))));
-    Assert.assertTrue("Job should be cleaned up", !fs.exists(new Path(job2.get("mapreduce.job.dir"))));
+    Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job1));
+    Assert.assertTrue("Job should be cleaned up", awaitJobCleanup(job2));
   }
 
   public static class TestJobTrackerInstrumentation extends JobTrackerInstrumentation {
@@ -356,7 +359,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
     
     UtilsForTests.configureWaitingJobConf(job1, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 30, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 30, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the faulty job
@@ -374,7 +377,7 @@ public class TestRecoveryManager {
 
     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
     UtilsForTests.configureWaitingJobConf(job2, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 20, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 20, 0,
         "test-recovery-manager", signalFile1, signalFile1);
     
     // submit the job
@@ -395,7 +398,7 @@ public class TestRecoveryManager {
       UserGroupInformation.createUserForTesting("abc", new String[]{"users"});
     
     UtilsForTests.configureWaitingJobConf(job3, 
-        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 1, 0,
+        new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 1, 0,
         "test-recovery-manager", signalFile, signalFile);
     
     // submit the job
@@ -453,7 +456,7 @@ public class TestRecoveryManager {
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob2.getID() + " to be successful");
       // Signaling Map task to complete
-      fs.create(new Path(TEST_DIR, "signal1"));
+      fs.create(new Path(TEST_DIR, "signal1")).close();
       UtilsForTests.waitFor(100);
     }
     Assert.assertTrue("Job should be successful", rJob2.isSuccessful());
@@ -488,7 +491,7 @@ public class TestRecoveryManager {
     job1.setJobPriority(JobPriority.HIGH);
 
     UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output7"), 30, 0, "test-restart", signalFile,
+        new Path(TEST_DIR, "output8"), 30, 0, "test-restart", signalFile,
         signalFile);
 
     // submit the faulty job
@@ -528,7 +531,7 @@ public class TestRecoveryManager {
     JobConf job2 = mr.createJobConf();
 
     UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"),
-        new Path(TEST_DIR, "output8"), 50, 0, "test-restart-manager",
+        new Path(TEST_DIR, "output9"), 50, 0, "test-restart-manager",
         signalFile, signalFile);
 
     // submit a new job
@@ -658,7 +661,7 @@ public class TestRecoveryManager {
 
     final JobConf job1 = mr.createJobConf();
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
-        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        new Path(HDFS_TEST_DIR, "output10"), 2, 0, "test-resubmission", signalFile,
         signalFile);
 
     UserGroupInformation ugi =
@@ -718,7 +721,7 @@ public class TestRecoveryManager {
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
     // Signaling Map task to complete
-    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       UtilsForTests.waitFor(100);
@@ -764,7 +767,7 @@ public class TestRecoveryManager {
 
     final JobConf job1 = mr.createJobConf();
     UtilsForTests.configureWaitingJobConf(job1, new Path(HDFS_TEST_DIR, "input"),
-        new Path(HDFS_TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile,
+        new Path(HDFS_TEST_DIR, "output11"), 2, 0, "test-resubmission", signalFile,
         signalFile);
 
     UserGroupInformation ugi =
@@ -805,7 +808,7 @@ public class TestRecoveryManager {
     JobInProgress jip = jobtracker.getJob(rJob1.getID());
 
     // Signaling Map task to complete
-    fs.create(new Path(HDFS_TEST_DIR, "signal"));
+    fs.create(new Path(HDFS_TEST_DIR, "signal")).close();
     while (!jip.isComplete()) {
       LOG.info("Waiting for job " + rJob1.getID() + " to be successful");
       UtilsForTests.waitFor(100);
@@ -813,4 +816,31 @@ public class TestRecoveryManager {
     rJob1 = jc.getJob(rJob1.getID());
     Assert.assertTrue("Task should be successful", rJob1.isSuccessful());
   }
+
+  /**
+   * Awaits completion of job cleanup, which runs asynchronously on a background
+   * thread.  Job cleanup removes the job directory.  This method periodically
+   * polls for the existence of that directory, waiting up to a maximum time for
+   * the cleanup to complete.
+   * 
+   * @param jobConf JobConf to inspect
+   * @return boolean true if job cleanup detected, or false if job cleanup was
+   *   not detected within the maximum wait time
+   * @throws IOException if there is an I/O error checking the directory
+   * @throws InterruptedException if thread interrupted while sleeping
+   */
+  private boolean awaitJobCleanup(JobConf jobConf) throws IOException,
+      InterruptedException {
+    Path jobDir = new Path(jobConf.get("mapreduce.job.dir"));
+    long start = System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+    long elapsed = 0;
+    do {
+      if (!fs.exists(jobDir)) {
+        return true;
+      }
+      Thread.sleep(AWAIT_JOB_CLEANUP_POLL_PERIOD_MILLISECONDS);
+      elapsed = (System.nanoTime() / NANOSECONDS_PER_MILLISECOND) - start;
+    } while (elapsed < AWAIT_JOB_CLEANUP_MAX_WAIT_MILLISECONDS);
+    return false;
+  }
 }