You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/09/18 09:16:19 UTC

svn commit: r1172206 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/...

Author: vinodkv
Date: Sun Sep 18 07:16:18 2011
New Revision: 1172206

URL: http://svn.apache.org/viewvc?rev=1172206&view=rev
Log:
MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out history file. (vinodkv)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Sep 18 07:16:18 2011
@@ -1339,6 +1339,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2987. Fixed display of logged user on RM Web-UI. (Thomas Graves
     via acmurthy)
 
+    MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out
+    history file. (vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Sun Sep 18 07:16:18 2011
@@ -74,7 +74,7 @@ public class JobHistoryEventHandler exte
 
   private BlockingQueue<JobHistoryEvent> eventQueue =
     new LinkedBlockingQueue<JobHistoryEvent>();
-  private Thread eventHandlingThread;
+  protected Thread eventHandlingThread;
   private volatile boolean stopped;
   private final Object lock = new Object();
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sun Sep 18 07:16:18 2011
@@ -56,12 +56,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
 import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
 import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
@@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.event.Asyn
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 
@@ -126,6 +129,7 @@ public class MRAppMaster extends Composi
   private TaskAttemptListener taskAttemptListener;
   private JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
+  private JobEventDispatcher jobEventDispatcher;
 
   private Job job;
   
@@ -148,7 +152,7 @@ public class MRAppMaster extends Composi
 
   @Override
   public void init(final Configuration conf) {
-    context = new RunningAppContext();
+    context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
@@ -182,18 +186,17 @@ public class MRAppMaster extends Composi
     //service to log job history events
     EventHandler<JobHistoryEvent> historyService = 
         createJobHistoryHandler(context);
-    addIfService(historyService);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        historyService);
 
-    JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
+    this.jobEventDispatcher = new JobEventDispatcher();
 
     //register the event dispatchers
-    dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
+    dispatcher.register(JobEventType.class, jobEventDispatcher);
     dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
-    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
-        historyService);
     
     if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
         || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@@ -203,10 +206,34 @@ public class MRAppMaster extends Composi
     }
 
     dispatcher.register(Speculator.EventType.class,
-        new SpeculatorEventDispatcher());
+        new SpeculatorEventDispatcher(conf));
 
-    Credentials fsTokens = new Credentials();
+    // service to allocate containers from RM (if non-uber) or to fake it (uber)
+    containerAllocator = createContainerAllocator(clientService, context);
+    addIfService(containerAllocator);
+    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+    // corresponding service to launch allocated containers via NodeManager
+    containerLauncher = createContainerLauncher(context);
+    addIfService(containerLauncher);
+    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+
+    // Add the JobHistoryEventHandler last so that it is properly stopped first.
+    // This will guarantee that all history-events are flushed before AM goes
+    // ahead with shutdown.
+    // Note: Even though JobHistoryEventHandler is started last, if any
+    // component creates a JobHistoryEvent in the meanwhile, it will be just be
+    // queued inside the JobHistoryEventHandler 
+    addIfService(historyService);
 
+    super.init(conf);
+  } // end of init()
+
+  /** Create and initialize (but don't start) a single job. */
+  protected Job createJob(Configuration conf) {
+
+    // ////////// Obtain the tokens needed by the job. //////////
+    Credentials fsTokens = new Credentials();
     UserGroupInformation currentUser = null;
 
     try {
@@ -234,66 +261,12 @@ public class MRAppMaster extends Composi
     } catch (IOException e) {
       throw new YarnException(e);
     }
-
-    super.init(conf);
-
-    //---- start of what used to be startJobs() code:
-
-    Configuration config = getConfig();
-
-    job = createJob(config, fsTokens, currentUser.getUserName());
-
-    /** create a job event for job intialization */
-    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    /** send init to the job (this does NOT trigger job execution) */
-    synchronousJobEventDispatcher.handle(initJobEvent);
-
-    // send init to speculator. This won't yest start as dispatcher isn't
-    // started yet.
-    dispatcher.getEventHandler().handle(
-        new SpeculatorEvent(job.getID(), clock.getTime()));
-
-    // JobImpl's InitTransition is done (call above is synchronous), so the
-    // "uber-decision" (MR-1220) has been made.  Query job and switch to
-    // ubermode if appropriate (by registering different container-allocator
-    // and container-launcher services/event-handlers).
-
-    if (job.isUber()) {
-      LOG.info("MRAppMaster uberizing job " + job.getID()
-               + " in local container (\"uber-AM\").");
-    } else {
-      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
-               + "job " + job.getID() + ".");
-    }
-
-    // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    containerAllocator =
-        createContainerAllocator(clientService, context, job.isUber());
-    addIfService(containerAllocator);
-    dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
-    if (containerAllocator instanceof Service) {
-      ((Service) containerAllocator).init(config);
-    }
-
-    // corresponding service to launch allocated containers via NodeManager
-    containerLauncher = createContainerLauncher(context, job.isUber());
-    addIfService(containerLauncher);
-    dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
-    if (containerLauncher instanceof Service) {
-      ((Service) containerLauncher).init(config);
-    }
-
-  } // end of init()
-
-  /** Create and initialize (but don't start) a single job. 
-   * @param fsTokens */
-  protected Job createJob(Configuration conf, Credentials fsTokens, 
-      String user) {
+    // ////////// End of obtaining the tokens needed by the job. //////////
 
     // create single job
     Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
         taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
-        completedTasksFromPreviousRun, metrics, user);
+        completedTasksFromPreviousRun, metrics, currentUser.getUserName());
     ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
 
     dispatcher.register(JobFinishEvent.Type.class,
@@ -388,19 +361,13 @@ public class MRAppMaster extends Composi
   }
 
   protected ContainerAllocator createContainerAllocator(
-      ClientService clientService, AppContext context, boolean isLocal) {
-    //return new StaticContainerAllocator(context);
-    return isLocal
-        ? new LocalContainerAllocator(clientService, context)
-        : new RMContainerAllocator(clientService, context);
+      final ClientService clientService, final AppContext context) {
+    return new ContainerAllocatorRouter(clientService, context);
   }
 
-  protected ContainerLauncher createContainerLauncher(AppContext context,
-                                                      boolean isLocal) {
-    return isLocal
-        ? new LocalContainerLauncher(context,
-            (TaskUmbilicalProtocol) taskAttemptListener)
-        : new ContainerLauncherImpl(context);
+  protected ContainerLauncher
+      createContainerLauncher(final AppContext context) {
+    return new ContainerLauncherRouter(context);
   }
 
   //TODO:should have an interface for MRClientService
@@ -440,9 +407,96 @@ public class MRAppMaster extends Composi
     return taskAttemptListener;
   }
 
-  class RunningAppContext implements AppContext {
+  /**
+   * By the time life-cycle of this router starts, job-init would have already
+   * happened.
+   */
+  private final class ContainerAllocatorRouter extends AbstractService
+      implements ContainerAllocator {
+    private final ClientService clientService;
+    private final AppContext context;
+    private ContainerAllocator containerAllocator;
+
+    ContainerAllocatorRouter(ClientService clientService,
+        AppContext context) {
+      super(ContainerAllocatorRouter.class.getName());
+      this.clientService = clientService;
+      this.context = context;
+    }
+
+    @Override
+    public synchronized void start() {
+      if (job.isUber()) {
+        this.containerAllocator = new LocalContainerAllocator(
+            this.clientService, this.context);
+      } else {
+        this.containerAllocator = new RMContainerAllocator(
+            this.clientService, this.context);
+      }
+      ((Service)this.containerAllocator).init(getConfig());
+      ((Service)this.containerAllocator).start();
+      super.start();
+    }
+
+    @Override
+    public synchronized void stop() {
+      ((Service)this.containerAllocator).stop();
+      super.stop();
+    }
+
+    @Override
+    public void handle(ContainerAllocatorEvent event) {
+      this.containerAllocator.handle(event);
+    }
+  }
+
+  /**
+   * By the time life-cycle of this router starts, job-init would have already
+   * happened.
+   */
+  private final class ContainerLauncherRouter extends AbstractService
+      implements ContainerLauncher {
+    private final AppContext context;
+    private ContainerLauncher containerLauncher;
+
+    ContainerLauncherRouter(AppContext context) {
+      super(ContainerLauncherRouter.class.getName());
+      this.context = context;
+    }
+
+    @Override
+    public synchronized void start() {
+      if (job.isUber()) {
+        this.containerLauncher = new LocalContainerLauncher(context,
+            (TaskUmbilicalProtocol) taskAttemptListener);
+      } else {
+        this.containerLauncher = new ContainerLauncherImpl(context);
+      }
+      ((Service)this.containerLauncher).init(getConfig());
+      ((Service)this.containerLauncher).start();
+      super.start();
+    }
+
+    @Override
+    public void handle(ContainerLauncherEvent event) {
+        this.containerLauncher.handle(event);
+    }
+
+    @Override
+    public synchronized void stop() {
+      ((Service)this.containerLauncher).stop();
+      super.stop();
+    }
+  }
+
+  private class RunningAppContext implements AppContext {
 
-    private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
+    private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
+    private final Configuration conf;
+
+    public RunningAppContext(Configuration config) {
+      this.conf = config;
+    }
 
     @Override
     public ApplicationAttemptId getApplicationAttemptId() {
@@ -481,7 +535,7 @@ public class MRAppMaster extends Composi
 
     @Override
     public CharSequence getUser() {
-      return getConfig().get(MRJobConfig.USER_NAME);
+      return this.conf.get(MRJobConfig.USER_NAME);
     }
 
     @Override
@@ -492,13 +546,45 @@ public class MRAppMaster extends Composi
 
   @Override
   public void start() {
+
+    ///////////////////// Create the job itself.
+    job = createJob(getConfig());
+    // End of creating the job.
+
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
 
-    startJobs();
+    /** create a job event for job intialization */
+    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+    /** send init to the job (this does NOT trigger job execution) */
+    // This is a synchronous call, not an event through dispatcher. We want
+    // job-init to be done completely here.
+    jobEventDispatcher.handle(initJobEvent);
+
+    // send init to speculator. This won't yest start as dispatcher isn't
+    // started yet.
+    dispatcher.getEventHandler().handle(
+        new SpeculatorEvent(job.getID(), clock.getTime()));
+
+    // JobImpl's InitTransition is done (call above is synchronous), so the
+    // "uber-decision" (MR-1220) has been made.  Query job and switch to
+    // ubermode if appropriate (by registering different container-allocator
+    // and container-launcher services/event-handlers).
+
+    if (job.isUber()) {
+      LOG.info("MRAppMaster uberizing job " + job.getID()
+               + " in local container (\"uber-AM\").");
+    } else {
+      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+               + "job " + job.getID() + ".");
+    }
+
     //start all the components
     super.start();
+
+    // All components have started, start the job.
+    startJobs();
   }
 
   /**
@@ -546,10 +632,14 @@ public class MRAppMaster extends Composi
 
   private class SpeculatorEventDispatcher implements
       EventHandler<SpeculatorEvent> {
+    private final Configuration conf;
+    public SpeculatorEventDispatcher(Configuration config) {
+      this.conf = config;
+    }
     @Override
     public void handle(SpeculatorEvent event) {
-      if (getConfig().getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-          || getConfig().getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+      if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
         // Speculator IS enabled, direct the event to there.
         speculator.handle(event);
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sun Sep 18 07:16:18 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -234,11 +235,16 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected Job createJob(Configuration conf, Credentials fsTokens,
-      String user) {
-    Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(),
+  protected Job createJob(Configuration conf) {
+    UserGroupInformation currentUser = null;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
                              getTaskAttemptListener(), getContext().getClock(),
-                             user);
+                             currentUser.getUserName());
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -279,8 +285,7 @@ public class MRApp extends MRAppMaster {
   }
   
   @Override
-  protected ContainerLauncher createContainerLauncher(AppContext context,
-                                                      boolean isLocal) {
+  protected ContainerLauncher createContainerLauncher(AppContext context) {
     return new MockContainerLauncher();
   }
 
@@ -317,7 +322,7 @@ public class MRApp extends MRAppMaster {
 
   @Override
   protected ContainerAllocator createContainerAllocator(
-      ClientService clientService, AppContext context, boolean isLocal) {
+      ClientService clientService, AppContext context) {
     return new ContainerAllocator(){
       private int containerCount;
       @Override
@@ -369,12 +374,14 @@ public class MRApp extends MRAppMaster {
 
   class TestJob extends JobImpl {
     //override the init transition
+    private final TestInitTransition initTransition = new TestInitTransition(
+        maps, reduces);
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
         = stateMachineFactory.addTransition(JobState.NEW,
             EnumSet.of(JobState.INITED, JobState.FAILED),
             JobEventType.JOB_INIT,
             // This is abusive.
-            new TestInitTransition(getConfig(), maps, reduces));
+            initTransition);
 
     private final StateMachine<JobState, JobEventType, JobEvent>
         localStateMachine;
@@ -384,10 +391,10 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
-    public TestJob(ApplicationId appID, EventHandler eventHandler,
-        TaskAttemptListener taskAttemptListener, Clock clock, 
-        String user) {
-      super(appID, new Configuration(), eventHandler, taskAttemptListener,
+    public TestJob(Configuration conf, ApplicationId appID,
+        EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+        Clock clock, String user) {
+      super(appID, conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), 
           getCompletedTaskFromPreviousRun(), metrics, user);
 
@@ -399,17 +406,14 @@ public class MRApp extends MRAppMaster {
 
   //Override InitTransition to not look for split files etc
   static class TestInitTransition extends JobImpl.InitTransition {
-    private Configuration config;
     private int maps;
     private int reduces;
-    TestInitTransition(Configuration config, int maps, int reduces) {
-      this.config = config;
+    TestInitTransition(int maps, int reduces) {
       this.maps = maps;
       this.reduces = reduces;
     }
     @Override
     protected void setup(JobImpl job) throws IOException {
-      job.conf = config;
       job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
       job.remoteJobConfFile = new Path("test");
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Sun Sep 18 07:16:18 2011
@@ -94,7 +94,7 @@ public class MRAppBenchmark {
     
     @Override
     protected ContainerAllocator createContainerAllocator(
-        ClientService clientService, AppContext context, boolean isLocal) {
+        ClientService clientService, AppContext context) {
       return new ThrottledContainerAllocator();
     }
     

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Sun Sep 18 07:16:18 2011
@@ -169,7 +169,7 @@ public class TestFail {
 
   @Test
   public void testTaskFailWithUnusedContainer() throws Exception {
-    MRApp app = new FailingTaskWithUnusedContainer();
+    MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
     Configuration conf = new Configuration();
     int maxAttempts = 1;
     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
@@ -194,21 +194,21 @@ public class TestFail {
     app.waitForState(job, JobState.FAILED);
   }
 
-  static class FailingTaskWithUnusedContainer extends MRApp {
+  static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
 
-    public FailingTaskWithUnusedContainer() {
+    public MRAppWithFailingTaskAndUnusedContainer() {
       super(1, 0, false, "TaskFailWithUnsedContainer", true);
     }
 
-    protected ContainerLauncher createContainerLauncher(AppContext context,
-        boolean isLocal) {
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
       return new ContainerLauncherImpl(context) {
         @Override
         public void handle(ContainerLauncherEvent event) {
 
           switch (event.getType()) {
           case CONTAINER_REMOTE_LAUNCH:
-            super.handle(event);
+            super.handle(event); // Unused event and container.
             break;
           case CONTAINER_REMOTE_CLEANUP:
             getContext().getEventHandler().handle(

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Sun Sep 18 07:16:18 2011
@@ -24,10 +24,10 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -195,6 +195,7 @@ public class TestMRApp {
   public static void main(String[] args) throws Exception {
     TestMRApp t = new TestMRApp();
     t.testMapReduce();
+    t.testZeroMapReduces();
     t.testCommitPending();
     t.testCompletedMapsForReduceSlowstart();
     t.testJobError();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Sun Sep 18 07:16:18 2011
@@ -92,6 +92,60 @@ public class TestJobHistoryEvents {
         parsedJob.getState());
   }
 
+  /**
+   * Verify that all the events are flushed on stopping the HistoryHandler
+   * @throws Exception
+   */
+  @Test
+  public void testEventsFlushOnStop() throws Exception {
+
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.USER_NAME, "test");
+    MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
+        .getClass().getName(), true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobId jobId = job.getID();
+    LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+    app.waitForState(job, JobState.SUCCEEDED);
+
+    // make sure all events are flushed
+    app.waitForState(Service.STATE.STOPPED);
+    /*
+     * Use HistoryContext to read logged events and verify the number of
+     * completed maps
+     */
+    HistoryContext context = new JobHistory();
+    ((JobHistory) context).init(conf);
+    Job parsedJob = context.getJob(jobId);
+    Assert.assertEquals("CompletedMaps not correct", 1, parsedJob
+        .getCompletedMaps());
+
+    Map<TaskId, Task> tasks = parsedJob.getTasks();
+    Assert.assertEquals("No of tasks not correct", 1, tasks.size());
+    verifyTask(tasks.values().iterator().next());
+
+    Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
+    Assert.assertEquals("No of maps not correct", 1, maps.size());
+
+    Assert.assertEquals("Job state not currect", JobState.SUCCEEDED,
+        parsedJob.getState());
+  }
+
+  @Test
+  public void testJobHistoryEventHandlerIsFirstServiceToStop() {
+    MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
+        .getClass().getName(), true);
+    Configuration conf = new Configuration();
+    app.init(conf);
+    Service[] services = app.getServices().toArray(new Service[0]);
+    // Verifying that it is the last to be added is same as verifying that it is
+    // the first to be stopped. CompositeService related tests already validate
+    // this.
+    Assert.assertEquals("JobHistoryEventHandler",
+        services[services.length - 1].getName());
+  }
+
   private void verifyTask(Task task) {
     Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
         task.getState());
@@ -116,14 +170,43 @@ public class TestJobHistoryEvents {
     @Override
     protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
         AppContext context) {
-      JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, 
-          getStartCount());
+      JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
+          context, getStartCount());
       return eventHandler;
     }
   }
-  
+
+  /**
+   * MRapp with special HistoryEventHandler that writes events only during stop.
+   * This is to simulate events that don't get written by the eventHandling
+   * thread due to say a slow DFS and verify that they are flushed during stop.
+   */
+  private static class MRAppWithSpecialHistoryHandler extends MRApp {
+
+    public MRAppWithSpecialHistoryHandler(int maps, int reduces,
+        boolean autoComplete, String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new JobHistoryEventHandler(context, getStartCount()) {
+        @Override
+        public void start() {
+          // Don't start any event draining thread.
+          super.eventHandlingThread = new Thread();
+          super.eventHandlingThread.start();
+        }
+      };
+    }
+
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobHistoryEvents t = new TestJobHistoryEvents();
     t.testHistoryEvents();
+    t.testEventsFlushOnStop();
+    t.testJobHistoryEventHandlerIsFirstServiceToStop();
   }
 }