You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/08/23 01:17:58 UTC

svn commit: r1516662 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/j...

Author: vinodkv
Date: Thu Aug 22 23:17:57 2013
New Revision: 1516662

URL: http://svn.apache.org/r1516662
Log:
MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. Contributed by Jian He.
svn merge --ignore-ancestry -c 1516660 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1516662&r1=1516661&r2=1516662&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Aug 22 23:17:57 2013
@@ -100,6 +100,9 @@ Release 2.1.1-beta - UNRELEASED
     MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via
     cnauroth)
 
+    MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
+    only after unregistering from the RM. (Jian He via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1516662&r1=1516661&r2=1516662&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Aug 22 23:17:57 2013
@@ -325,18 +325,23 @@ public class MRAppMaster extends Composi
         dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
             eater);
       }
-      
+
+      if (copyHistory) {
+        // Now that there's a FINISHING state for application on RM to give AMs
+        // plenty of time to clean up after unregister it's safe to clean staging
+        // directory after unregistering with RM. So, we start the staging-dir
+        // cleaner BEFORE the ContainerAllocator so that on shut-down,
+        // ContainerAllocator unregisters first and then the staging-dir cleaner
+        // deletes staging directory.
+        addService(createStagingDirCleaningService());
+      }
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       containerAllocator = createContainerAllocator(null, context);
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
 
       if (copyHistory) {
-        // Add the staging directory cleaner before the history server but after
-        // the container allocator so the staging directory is cleaned after
-        // the history has been flushed but before unregistering with the RM.
-        addService(createStagingDirCleaningService());
-
         // Add the JobHistoryEventHandler last so that it is properly stopped first.
         // This will guarantee that all history-events are flushed before AM goes
         // ahead with shutdown.
@@ -344,7 +349,6 @@ public class MRAppMaster extends Composi
         // component creates a JobHistoryEvent in the meanwhile, it will be just be
         // queued inside the JobHistoryEventHandler 
         addIfService(historyService);
-        
 
         JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
             dispatcher.getEventHandler());
@@ -396,6 +400,14 @@ public class MRAppMaster extends Composi
       dispatcher.register(Speculator.EventType.class,
           speculatorEventDispatcher);
 
+      // Now that there's a FINISHING state for application on RM to give AMs
+      // plenty of time to clean up after unregister it's safe to clean staging
+      // directory after unregistering with RM. So, we start the staging-dir
+      // cleaner BEFORE the ContainerAllocator so that on shut-down,
+      // ContainerAllocator unregisters first and then the staging-dir cleaner
+      // deletes staging directory.
+      addService(createStagingDirCleaningService());
+
       // service to allocate containers from RM (if non-uber) or to fake it (uber)
       addIfService(containerAllocator);
       dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@@ -405,11 +417,6 @@ public class MRAppMaster extends Composi
       addIfService(containerLauncher);
       dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
 
-      // Add the staging directory cleaner before the history server but after
-      // the container allocator so the staging directory is cleaned after
-      // the history has been flushed but before unregistering with the RM.
-      addService(createStagingDirCleaningService());
-
       // Add the JobHistoryEventHandler last so that it is properly stopped first.
       // This will guarantee that all history-events are flushed before AM goes
       // ahead with shutdown.

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1516662&r1=1516661&r2=1516662&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Aug 22 23:17:57 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -279,14 +282,17 @@ import org.junit.Test;
    }
 
   private final class MRAppTestCleanup extends MRApp {
-    boolean stoppedContainerAllocator;
-    boolean cleanedBeforeContainerAllocatorStopped;
-
+    int stagingDirCleanedup;
+    int ContainerAllocatorStopped;
+    int JobHistoryEventHandlerStopped;
+    int numStops;
     public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
         String testName, boolean cleanOnStart) {
       super(maps, reduces, autoComplete, testName, cleanOnStart);
-      stoppedContainerAllocator = false;
-      cleanedBeforeContainerAllocatorStopped = false;
+      stagingDirCleanedup = 0;
+      ContainerAllocatorStopped = 0;
+      JobHistoryEventHandlerStopped = 0;
+      numStops = 0;
     }
 
     @Override
@@ -313,6 +319,26 @@ import org.junit.Test;
     }
 
     @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new TestJobHistoryEventHandler(context, getStartCount());
+    }
+
+    private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
+
+      public TestJobHistoryEventHandler(AppContext context, int startCount) {
+        super(context, startCount);
+      }
+
+      @Override
+      public void serviceStop() throws Exception {
+        numStops++;
+        JobHistoryEventHandlerStopped = numStops;
+        super.serviceStop();
+      }
+    }
+
+    @Override
     protected ContainerAllocator createContainerAllocator(
         ClientService clientService, AppContext context) {
       return new TestCleanupContainerAllocator();
@@ -334,7 +360,8 @@ import org.junit.Test;
 
       @Override
       protected void serviceStop() throws Exception {
-        stoppedContainerAllocator = true;
+        numStops++;
+        ContainerAllocatorStopped = numStops;
         super.serviceStop();
       }
     }
@@ -346,7 +373,8 @@ import org.junit.Test;
 
     @Override
     public void cleanupStagingDir() throws IOException {
-      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+      numStops++;
+      stagingDirCleanedup = numStops;
     }
 
     @Override
@@ -377,11 +405,15 @@ import org.junit.Test;
     app.verifyCompleted();
 
     int waitTime = 20 * 1000;
-    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+    while (waitTime > 0 && app.numStops < 3 ) {
       Thread.sleep(100);
       waitTime -= 100;
     }
-    Assert.assertTrue("Staging directory not cleaned before notifying RM",
-        app.cleanedBeforeContainerAllocatorStopped);
+
+    // assert JobHistoryEventHandlerStopped first, then
+    // ContainerAllocatorStopped, and then stagingDirCleanedup
+    Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
+    Assert.assertEquals(2, app.ContainerAllocatorStopped);
+    Assert.assertEquals(3, app.stagingDirCleanedup);
   }
  }