You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/08/23 01:17:12 UTC
svn commit: r1516660 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apac...
Author: vinodkv
Date: Thu Aug 22 23:17:12 2013
New Revision: 1516660
URL: http://svn.apache.org/r1516660
Log:
MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1516660&r1=1516659&r2=1516660&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Aug 22 23:17:12 2013
@@ -237,6 +237,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5470. LocalJobRunner does not work on Windows. (Sandy Ryza via
cnauroth)
+ MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory
+ only after unregistering from the RM. (Jian He via vinodkv)
+
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1516660&r1=1516659&r2=1516660&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Aug 22 23:17:12 2013
@@ -325,18 +325,23 @@ public class MRAppMaster extends Composi
dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
eater);
}
-
+
+ if (copyHistory) {
+ // Now that there's a FINISHING state for application on RM to give AMs
+ // plenty of time to clean up after unregister it's safe to clean staging
+ // directory after unregistering with RM. So, we start the staging-dir
+ // cleaner BEFORE the ContainerAllocator so that on shut-down,
+ // ContainerAllocator unregisters first and then the staging-dir cleaner
+ // deletes staging directory.
+ addService(createStagingDirCleaningService());
+ }
+
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(null, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
if (copyHistory) {
- // Add the staging directory cleaner before the history server but after
- // the container allocator so the staging directory is cleaned after
- // the history has been flushed but before unregistering with the RM.
- addService(createStagingDirCleaningService());
-
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
@@ -344,7 +349,6 @@ public class MRAppMaster extends Composi
// component creates a JobHistoryEvent in the meanwhile, it will be just be
// queued inside the JobHistoryEventHandler
addIfService(historyService);
-
JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID,
dispatcher.getEventHandler());
@@ -396,6 +400,14 @@ public class MRAppMaster extends Composi
dispatcher.register(Speculator.EventType.class,
speculatorEventDispatcher);
+ // Now that there's a FINISHING state for application on RM to give AMs
+ // plenty of time to clean up after unregister it's safe to clean staging
+ // directory after unregistering with RM. So, we start the staging-dir
+ // cleaner BEFORE the ContainerAllocator so that on shut-down,
+ // ContainerAllocator unregisters first and then the staging-dir cleaner
+ // deletes staging directory.
+ addService(createStagingDirCleaningService());
+
// service to allocate containers from RM (if non-uber) or to fake it (uber)
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
@@ -405,11 +417,6 @@ public class MRAppMaster extends Composi
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
- // Add the staging directory cleaner before the history server but after
- // the container allocator so the staging directory is cleaned after
- // the history has been flushed but before unregistering with the RM.
- addService(createStagingDirCleaningService());
-
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1516660&r1=1516659&r2=1516660&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Aug 22 23:17:12 2013
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -279,14 +282,17 @@ import org.junit.Test;
}
private final class MRAppTestCleanup extends MRApp {
- boolean stoppedContainerAllocator;
- boolean cleanedBeforeContainerAllocatorStopped;
-
+ int stagingDirCleanedup;
+ int ContainerAllocatorStopped;
+ int JobHistoryEventHandlerStopped;
+ int numStops;
public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
- stoppedContainerAllocator = false;
- cleanedBeforeContainerAllocatorStopped = false;
+ stagingDirCleanedup = 0;
+ ContainerAllocatorStopped = 0;
+ JobHistoryEventHandlerStopped = 0;
+ numStops = 0;
}
@Override
@@ -313,6 +319,26 @@ import org.junit.Test;
}
@Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new TestJobHistoryEventHandler(context, getStartCount());
+ }
+
+ private class TestJobHistoryEventHandler extends JobHistoryEventHandler {
+
+ public TestJobHistoryEventHandler(AppContext context, int startCount) {
+ super(context, startCount);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ numStops++;
+ JobHistoryEventHandlerStopped = numStops;
+ super.serviceStop();
+ }
+ }
+
+ @Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, AppContext context) {
return new TestCleanupContainerAllocator();
@@ -334,7 +360,8 @@ import org.junit.Test;
@Override
protected void serviceStop() throws Exception {
- stoppedContainerAllocator = true;
+ numStops++;
+ ContainerAllocatorStopped = numStops;
super.serviceStop();
}
}
@@ -346,7 +373,8 @@ import org.junit.Test;
@Override
public void cleanupStagingDir() throws IOException {
- cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+ numStops++;
+ stagingDirCleanedup = numStops;
}
@Override
@@ -377,11 +405,15 @@ import org.junit.Test;
app.verifyCompleted();
int waitTime = 20 * 1000;
- while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+ while (waitTime > 0 && app.numStops < 3 ) {
Thread.sleep(100);
waitTime -= 100;
}
- Assert.assertTrue("Staging directory not cleaned before notifying RM",
- app.cleanedBeforeContainerAllocatorStopped);
+
+ // assert JobHistoryEventHandlerStopped first, then
+ // ContainerAllocatorStopped, and then stagingDirCleanedup
+ Assert.assertEquals(1, app.JobHistoryEventHandlerStopped);
+ Assert.assertEquals(2, app.ContainerAllocatorStopped);
+ Assert.assertEquals(3, app.stagingDirCleanedup);
}
}