You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/04/11 19:11:34 UTC
svn commit: r1324868 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/j...
Author: sseth
Date: Wed Apr 11 17:11:34 2012
New Revision: 1324868
URL: http://svn.apache.org/viewvc?rev=1324868&view=rev
Log:
merge MAPREDUCE-4099 amendment from trunk. ApplicationMaster will remove staging directory after the history service is stopped. (Contributed by Jason Lowe)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1324868&r1=1324867&r2=1324868&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 17:11:34 2012
@@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4040. History links should use hostname rather than IP address.
(Bhallamudi Venkata Siva Kamesh via sseth)
+ MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
+ after the history service is stopped. (Jason Lowe via sseth)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1324868&r1=1324867&r2=1324868&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Apr 11 17:11:34 2012
@@ -285,6 +285,11 @@ public class MRAppMaster extends Composi
addIfService(containerLauncher);
dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+ // Add the staging directory cleaner before the history server but after
+ // the container allocator so the staging directory is cleaned after
+ // the history has been flushed but before unregistering with the RM.
+ addService(createStagingDirCleaningService());
+
// Add the JobHistoryEventHandler last so that it is properly stopped first.
// This will guarantee that all history-events are flushed before AM goes
// ahead with shutdown.
@@ -406,13 +411,6 @@ public class MRAppMaster extends Composi
e.printStackTrace();
}
- // Cleanup staging directory
- try {
- cleanupStagingDir();
- } catch(IOException io) {
- LOG.warn("Failed to delete staging dir", io);
- }
-
try {
// Stop all services
// This will also send the final report to the ResourceManager
@@ -512,6 +510,10 @@ public class MRAppMaster extends Composi
return this.jobHistoryEventHandler;
}
+ protected AbstractService createStagingDirCleaningService() {
+ return new StagingDirCleaningService();
+ }
+
protected Speculator createSpeculator(Configuration conf, AppContext context) {
Class<? extends Speculator> speculatorClass;
@@ -710,6 +712,22 @@ public class MRAppMaster extends Composi
}
}
+ private final class StagingDirCleaningService extends AbstractService {
+ StagingDirCleaningService() {
+ super(StagingDirCleaningService.class.getName());
+ }
+
+ @Override
+ public synchronized void stop() {
+ try {
+ cleanupStagingDir();
+ } catch (IOException io) {
+ LOG.error("Failed to cleanup staging dir: ", io);
+ }
+ super.stop();
+ }
+ }
+
private class RunningAppContext implements AppContext {
private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1324868&r1=1324867&r2=1324868&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Apr 11 17:11:34 2012
@@ -428,9 +428,13 @@ public class MRApp extends MRAppMaster {
@Override
protected ContainerAllocator createContainerAllocator(
ClientService clientService, final AppContext context) {
- return new ContainerAllocator(){
- private int containerCount;
- @Override
+ return new MRAppContainerAllocator();
+ }
+
+ protected class MRAppContainerAllocator implements ContainerAllocator {
+ private int containerCount;
+
+ @Override
public void handle(ContainerAllocatorEvent event) {
ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
@@ -452,7 +456,6 @@ public class MRApp extends MRAppMaster {
new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
container, null));
}
- };
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1324868&r1=1324867&r2=1324868&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Wed Apr 11 17:11:34 2012
@@ -18,11 +18,10 @@
package org.apache.hadoop.mapreduce.v2.app;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.spy;
-import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.YarnException;
import org.junit.Test;
/**
@@ -237,71 +233,6 @@ public class TestMRApp {
}
}
- private final class MRAppTestCleanup extends MRApp {
- boolean hasStopped;
- boolean cleanedBeforeStopped;
-
- public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
- String testName, boolean cleanOnStart) {
- super(maps, reduces, autoComplete, testName, cleanOnStart);
- hasStopped = false;
- cleanedBeforeStopped = false;
- }
-
- @Override
- protected Job createJob(Configuration conf) {
- UserGroupInformation currentUser = null;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new YarnException(e);
- }
- Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
- getDispatcher().getEventHandler(),
- getTaskAttemptListener(), getContext().getClock(),
- getCommitter(), isNewApiCommitter(),
- currentUser.getUserName(), getContext());
- ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
-
- getDispatcher().register(JobFinishEvent.Type.class,
- createJobFinishEventHandler());
-
- return newJob;
- }
-
- @Override
- public void cleanupStagingDir() throws IOException {
- cleanedBeforeStopped = !hasStopped;
- }
-
- @Override
- public synchronized void stop() {
- hasStopped = true;
- super.stop();
- }
-
- @Override
- protected void sysexit() {
- }
- }
-
- @Test
- public void testStagingCleanupOrder() throws Exception {
- MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
- this.getClass().getName(), true);
- JobImpl job = (JobImpl)app.submit(new Configuration());
- app.waitForState(job, JobState.SUCCEEDED);
- app.verifyCompleted();
-
- int waitTime = 20 * 1000;
- while (waitTime > 0 && !app.cleanedBeforeStopped) {
- Thread.sleep(100);
- waitTime -= 100;
- }
- Assert.assertTrue("Staging directory not cleaned before notifying RM",
- app.cleanedBeforeStopped);
- }
-
public static void main(String[] args) throws Exception {
TestMRApp t = new TestMRApp();
t.testMapReduce();
@@ -310,6 +241,5 @@ public class TestMRApp {
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
t.testCountersOnJobFinish();
- t.testStagingCleanupOrder();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1324868&r1=1324867&r2=1324868&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Apr 11 17:11:34 2012
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@@ -35,12 +36,21 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
@@ -103,4 +113,89 @@ import org.junit.Test;
}
}
+ private final class MRAppTestCleanup extends MRApp {
+ boolean stoppedContainerAllocator;
+ boolean cleanedBeforeContainerAllocatorStopped;
+
+ public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+ String testName, boolean cleanOnStart) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
+ stoppedContainerAllocator = false;
+ cleanedBeforeContainerAllocatorStopped = false;
+ }
+
+ @Override
+ protected Job createJob(Configuration conf) {
+ UserGroupInformation currentUser = null;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+ getDispatcher().getEventHandler(),
+ getTaskAttemptListener(), getContext().getClock(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext());
+ ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+ getDispatcher().register(JobFinishEvent.Type.class,
+ createJobFinishEventHandler());
+
+ return newJob;
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new TestCleanupContainerAllocator();
+ }
+
+ private class TestCleanupContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+ private MRAppContainerAllocator allocator;
+
+ TestCleanupContainerAllocator() {
+ super(TestCleanupContainerAllocator.class.getName());
+ allocator = new MRAppContainerAllocator();
+ }
+
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ allocator.handle(event);
+ }
+
+ @Override
+ public synchronized void stop() {
+ stoppedContainerAllocator = true;
+ super.stop();
+ }
+ }
+
+ @Override
+ public void cleanupStagingDir() throws IOException {
+ cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+ }
+
+ @Override
+ protected void sysexit() {
+ }
+ }
+
+ @Test
+ public void testStagingCleanupOrder() throws Exception {
+ MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+ this.getClass().getName(), true);
+ JobImpl job = (JobImpl)app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+
+ int waitTime = 20 * 1000;
+ while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+ Thread.sleep(100);
+ waitTime -= 100;
+ }
+ Assert.assertTrue("Staging directory not cleaned before notifying RM",
+ app.cleanedBeforeContainerAllocatorStopped);
+ }
}
\ No newline at end of file