You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/04/11 19:11:42 UTC

svn commit: r1324869 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/tes...

Author: sseth
Date: Wed Apr 11 17:11:42 2012
New Revision: 1324869

URL: http://svn.apache.org/viewvc?rev=1324869&view=rev
Log:
merge MAPREDUCE-4099 amendment from trunk. ApplicationMaster will remove staging directory after the history service is stopped. (Contributed by Jason Lowe)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1324869&r1=1324868&r2=1324869&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 17:11:42 2012
@@ -77,6 +77,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4040. History links should use hostname rather than IP address.
     (Bhallamudi Venkata Siva Kamesh via sseth)
 
+    MAPREDUCE-4099 amendment. ApplicationMaster will remove staging directory
+    after the history service is stopped. (Jason Lowe via sseth)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1324869&r1=1324868&r2=1324869&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Apr 11 17:11:42 2012
@@ -285,6 +285,11 @@ public class MRAppMaster extends Composi
     addIfService(containerLauncher);
     dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
 
+    // Add the staging directory cleaner before the history server but after
+    // the container allocator so the staging directory is cleaned after
+    // the history has been flushed but before unregistering with the RM.
+    addService(createStagingDirCleaningService());
+
     // Add the JobHistoryEventHandler last so that it is properly stopped first.
     // This will guarantee that all history-events are flushed before AM goes
     // ahead with shutdown.
@@ -406,13 +411,6 @@ public class MRAppMaster extends Composi
         e.printStackTrace();
       }
 
-      // Cleanup staging directory
-      try {
-        cleanupStagingDir();
-      } catch(IOException io) {
-        LOG.warn("Failed to delete staging dir", io);
-      }
-
       try {
         // Stop all services
         // This will also send the final report to the ResourceManager
@@ -512,6 +510,10 @@ public class MRAppMaster extends Composi
     return this.jobHistoryEventHandler;
   }
 
+  protected AbstractService createStagingDirCleaningService() {
+    return new StagingDirCleaningService();
+  }
+
   protected Speculator createSpeculator(Configuration conf, AppContext context) {
     Class<? extends Speculator> speculatorClass;
 
@@ -710,6 +712,22 @@ public class MRAppMaster extends Composi
     }
   }
 
+  private final class StagingDirCleaningService extends AbstractService {
+    StagingDirCleaningService() {
+      super(StagingDirCleaningService.class.getName());
+    }
+
+    @Override
+    public synchronized void stop() {
+      try {
+        cleanupStagingDir();
+      } catch (IOException io) {
+        LOG.error("Failed to cleanup staging dir: ", io);
+      }
+      super.stop();
+    }
+  }
+
   private class RunningAppContext implements AppContext {
 
     private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1324869&r1=1324868&r2=1324869&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Apr 11 17:11:42 2012
@@ -428,9 +428,13 @@ public class MRApp extends MRAppMaster {
   @Override
   protected ContainerAllocator createContainerAllocator(
       ClientService clientService, final AppContext context) {
-    return new ContainerAllocator(){
-      private int containerCount;
-      @Override
+    return new MRAppContainerAllocator();
+  }
+
+  protected class MRAppContainerAllocator implements ContainerAllocator {
+    private int containerCount;
+
+     @Override
       public void handle(ContainerAllocatorEvent event) {
         ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
         cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
@@ -452,7 +456,6 @@ public class MRApp extends MRAppMaster {
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
                 container, null));
       }
-    };
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1324869&r1=1324868&r2=1324869&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Wed Apr 11 17:11:42 2012
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.spy;
 
-import java.io.IOException;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -36,14 +35,11 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
-import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.YarnException;
 import org.junit.Test;
 
 /**
@@ -237,71 +233,6 @@ public class TestMRApp {
     }
   }
 
-  private final class MRAppTestCleanup extends MRApp {
-    boolean hasStopped;
-    boolean cleanedBeforeStopped;
-
-    public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
-        String testName, boolean cleanOnStart) {
-      super(maps, reduces, autoComplete, testName, cleanOnStart);
-      hasStopped = false;
-      cleanedBeforeStopped = false;
-    }
-
-    @Override
-    protected Job createJob(Configuration conf) {
-      UserGroupInformation currentUser = null;
-      try {
-        currentUser = UserGroupInformation.getCurrentUser();
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-      Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
-          getDispatcher().getEventHandler(),
-          getTaskAttemptListener(), getContext().getClock(),
-          getCommitter(), isNewApiCommitter(),
-          currentUser.getUserName(), getContext());
-      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
-
-      getDispatcher().register(JobFinishEvent.Type.class,
-          createJobFinishEventHandler());
-
-      return newJob;
-    }
-
-    @Override
-    public void cleanupStagingDir() throws IOException {
-      cleanedBeforeStopped = !hasStopped;
-    }
-
-    @Override
-    public synchronized void stop() {
-      hasStopped = true;
-      super.stop();
-    }
-
-    @Override
-    protected void sysexit() {
-    }
-  }
-
-  @Test
-  public void testStagingCleanupOrder() throws Exception {
-    MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
-        this.getClass().getName(), true);
-    JobImpl job = (JobImpl)app.submit(new Configuration());
-    app.waitForState(job, JobState.SUCCEEDED);
-    app.verifyCompleted();
-
-    int waitTime = 20 * 1000;
-    while (waitTime > 0 && !app.cleanedBeforeStopped) {
-      Thread.sleep(100);
-      waitTime -= 100;
-    }
-    Assert.assertTrue("Staging directory not cleaned before notifying RM",
-        app.cleanedBeforeStopped);
-  }
-
   public static void main(String[] args) throws Exception {
     TestMRApp t = new TestMRApp();
     t.testMapReduce();
@@ -310,6 +241,5 @@ public class TestMRApp {
     t.testCompletedMapsForReduceSlowstart();
     t.testJobError();
     t.testCountersOnJobFinish();
-    t.testStagingCleanupOrder();
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1324869&r1=1324868&r2=1324869&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Apr 11 17:11:42 2012
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -35,12 +36,21 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 
@@ -103,4 +113,89 @@ import org.junit.Test;
     }
    }
 
+  private final class MRAppTestCleanup extends MRApp {
+    boolean stoppedContainerAllocator;
+    boolean cleanedBeforeContainerAllocatorStopped;
+
+    public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+      stoppedContainerAllocator = false;
+      cleanedBeforeContainerAllocatorStopped = false;
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      UserGroupInformation currentUser = null;
+      try {
+        currentUser = UserGroupInformation.getCurrentUser();
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+      Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+          getDispatcher().getEventHandler(),
+          getTaskAttemptListener(), getContext().getClock(),
+          getCommitter(), isNewApiCommitter(),
+          currentUser.getUserName(), getContext());
+      ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+      getDispatcher().register(JobFinishEvent.Type.class,
+          createJobFinishEventHandler());
+
+      return newJob;
+    }
+
+    @Override
+    protected ContainerAllocator createContainerAllocator(
+        ClientService clientService, AppContext context) {
+      return new TestCleanupContainerAllocator();
+    }
+
+    private class TestCleanupContainerAllocator extends AbstractService
+        implements ContainerAllocator {
+      private MRAppContainerAllocator allocator;
+
+      TestCleanupContainerAllocator() {
+        super(TestCleanupContainerAllocator.class.getName());
+        allocator = new MRAppContainerAllocator();
+      }
+
+      @Override
+      public void handle(ContainerAllocatorEvent event) {
+        allocator.handle(event);
+      }
+
+      @Override
+      public synchronized void stop() {
+        stoppedContainerAllocator = true;
+        super.stop();
+      }
+    }
+
+    @Override
+    public void cleanupStagingDir() throws IOException {
+      cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
+    }
+
+    @Override
+    protected void sysexit() {
+    }
+  }
+
+  @Test
+  public void testStagingCleanupOrder() throws Exception {
+    MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+        this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+
+    int waitTime = 20 * 1000;
+    while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) {
+      Thread.sleep(100);
+      waitTime -= 100;
+    }
+    Assert.assertTrue("Staging directory not cleaned before notifying RM",
+        app.cleanedBeforeContainerAllocatorStopped);
+  }
  }
\ No newline at end of file