You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2013/01/03 22:24:08 UTC

svn commit: r1428601 [2/2] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/had...

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Jan  3 21:23:58 2013
@@ -201,8 +201,7 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          getCommitter(), isNewApiCommitter(),
-          currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext());
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Jan  3 21:23:58 2013
@@ -19,46 +19,51 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Records;
@@ -69,121 +74,223 @@ import org.junit.Test;
 /**
  * Tests various functions of the JobImpl class
  */
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
   @Test
-  public void testJobNoTasksTransition() { 
-    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
-    JobImpl mockJob = mock(JobImpl.class);
-
-    // Force checkJobCompleteSuccess to return null
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
+  public void testJobNoTasks() {
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = mock(OutputCommitter.class);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 0);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
-    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
-    JobEvent mockJobEvent = mock(JobEvent.class);
-    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
-    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobStateInternal.ERROR, state);
+  @Test(timeout=20000)
+  public void testCommitJobFailsJob() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer fail and verify the job fails
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.FAILED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCommitJobFailsJob() {
+  @Test(timeout=20000)
+  public void testCheckJobCompleteSuccess() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer complete and verify the job succeeds
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testKilledDuringSetup() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
 
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
-        JobStateInternal.KILLED);
-    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
-        JobStateInternal.FAILED);
-    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job", jobState);
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.FAILED, jobState);
-    verify(mockJob).abortJob(
-        eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccess() {
-    
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-    
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    doNothing().when(mockJob).setFinishTime();
-    doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+  @Test(timeout=20000)
+  public void testKilledDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccessFailed() {
-    JobImpl mockJob = mock(JobImpl.class);
+  @Test(timeout=20000)
+  public void testKilledDuringFailAbort() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public void setupJob(JobContext jobContext) throws IOException {
+        throw new IOException("forced failure");
+      }
+
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAIL_ABORT);
 
-    // Make the completedTasks not equal the getTasks()
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
-    
-    try {
-      // Just in case the code breaks and reaches these calls
-      OutputCommitter mockCommitter = mock(OutputCommitter.class);
-      EventHandler mockEventHandler = mock(EventHandler.class);
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    } catch (IOException e) {
-      e.printStackTrace();    
-    }
-    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
-      "for unsuccessful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
+  @Test(timeout=20000)
+  public void testKilledDuringKillAbort() throws Exception {
+    Configuration conf = new Configuration();
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILL_ABORT);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
-    t.testJobNoTasksTransition();
+    t.testJobNoTasks();
     t.testCheckJobCompleteSuccess();
-    t.testCheckJobCompleteSuccessFailed();
     t.testCheckAccess();
     t.testReportDiagnostics();
     t.testUberDecision();
@@ -208,7 +315,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -219,7 +326,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -230,7 +337,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -241,7 +348,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -252,7 +359,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -270,8 +377,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -282,8 +388,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -338,20 +443,23 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
-    InitTransition initTransition = getInitTransition();
+        mrAppMetrics, true, null, 0, null, null);
+    InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
     boolean isUber = job.isUber();
     return isUber;
   }
 
-  private static InitTransition getInitTransition() {
+  private static InitTransition getInitTransition(final int numSplits) {
     InitTransition initTransition = new InitTransition() {
       @Override
       protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
-        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
-            new TaskSplitMetaInfo() };
+        TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits];
+        for (int i = 0; i < numSplits; ++i) {
+          splits[i] = new TaskSplitMetaInfo();
+        }
+        return splits;
       }
     };
     return initTransition;
@@ -360,19 +468,24 @@ public class TestJobImpl {
   @Test
   public void testTransitionsAtFailed() throws IOException {
     Configuration conf = new Configuration();
-    JobID jobID = JobID.forName("job_1234567890000_0001");
-    JobId jobId = TypeConverter.toYarn(jobID);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+
     OutputCommitter committer = mock(OutputCommitter.class);
     doThrow(new IOException("forcefail"))
       .when(committer).setupJob(any(JobContext.class));
-    InlineDispatcher dispatcher = new InlineDispatcher();
-    JobImpl job = new StubbedJob(jobId, Records
-        .newRecord(ApplicationAttemptId.class), conf,
-        dispatcher.getEventHandler(), committer, true, null);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
 
-    dispatcher.register(JobEventType.class, job);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
-    Assert.assertEquals(JobState.FAILED, job.getState());
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
     Assert.assertEquals(JobState.FAILED, job.getState());
@@ -382,17 +495,86 @@ public class TestJobImpl {
     Assert.assertEquals(JobState.FAILED, job.getState());
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
     Assert.assertEquals(JobState.FAILED, job.getState());
+
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  private static CommitterEventHandler createCommitterEventHandler(
+      Dispatcher dispatcher, OutputCommitter committer) {
+    SystemClock clock = new SystemClock();
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getEventHandler()).thenReturn(
+        dispatcher.getEventHandler());
+    when(appContext.getClock()).thenReturn(clock);
+    CommitterEventHandler handler =
+        new CommitterEventHandler(appContext, committer);
+    dispatcher.register(CommitterEventType.class, handler);
+    return handler;
+  }
+
+  private static StubbedJob createStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    StubbedJob job = new StubbedJob(jobId,
+        Records.newRecord(ApplicationAttemptId.class), conf,
+        dispatcher.getEventHandler(), true, "somebody", numSplits);
+    dispatcher.register(JobEventType.class, job);
+    EventHandler mockHandler = mock(EventHandler.class);
+    dispatcher.register(TaskEventType.class, mockHandler);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        mockHandler);
+    dispatcher.register(JobFinishEvent.Type.class, mockHandler);
+    return job;
+  }
+
+  private static StubbedJob createRunningStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.RUNNING);
+    return job;
+  }
+
+  private static void completeJobTasks(JobImpl job) {
+    // complete the map tasks and the reduce tasks so we start committing
+    int numMaps = job.getTotalMaps();
+    for (int i = 0; i < numMaps; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+    int numReduces = job.getTotalReduces();
+    for (int i = 0; i < numReduces; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+  }
+
+  private static void assertJobState(JobImpl job, JobStateInternal state) {
+    int timeToWaitMsec = 5 * 1000;
+    while (timeToWaitMsec > 0 && job.getInternalState() != state) {
+      try {
+        Thread.sleep(10);
+        timeToWaitMsec -= 10;
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    Assert.assertEquals(state, job.getInternalState());
   }
 
   private static class StubbedJob extends JobImpl {
     //override the init transition
-    private final InitTransition initTransition = getInitTransition();
-    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobStateInternal.NEW,
-            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
-            JobEventType.JOB_INIT,
-            // This is abusive.
-            initTransition);
+    private final InitTransition initTransition;
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+        localFactory;
 
     private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
@@ -404,15 +586,102 @@ public class TestJobImpl {
 
     public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
-        OutputCommitter committer, boolean newApiCommitter, String user) {
+        boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(), committer,
+          new SystemClock(), null, MRAppMetrics.create(),
           newApiCommitter, user, System.currentTimeMillis(), null, null);
 
+      initTransition = getInitTransition(numSplits);
+      localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.
       localStateMachine = localFactory.make(this);
     }
   }
+
+  private static class StubbedOutputCommitter extends OutputCommitter {
+
+    public StubbedOutputCommitter() {
+      super();
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    }
+  }
+
+  private static class TestingOutputCommitter extends StubbedOutputCommitter {
+    CyclicBarrier syncBarrier;
+    boolean shouldSucceed;
+
+    public TestingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super();
+      this.syncBarrier = syncBarrier;
+      this.shouldSucceed = shouldSucceed;
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      if (!shouldSucceed) {
+        throw new IOException("forced failure");
+      }
+    }
+  }
+
+  private static class WaitingOutputCommitter extends TestingOutputCommitter {
+    public WaitingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super(syncBarrier, shouldSucceed);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      while (!Thread.interrupted()) {
+        try {
+          synchronized (this) {
+            wait();
+          }
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Thu Jan  3 21:23:58 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -253,10 +252,9 @@ public class TestTaskAttempt{
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     Path jobFile = mock(Path.class);
     JobConf jobConf = new JobConf();
-    OutputCommitter outputCommitter = mock(OutputCommitter.class);
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+            taskSplitMetaInfo, jobConf, taListener, null,
             null, clock, null);
     return taImpl;
   }
@@ -342,7 +340,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), null);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -397,7 +395,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -453,7 +451,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -512,7 +510,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -578,7 +576,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -628,7 +626,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Thu Jan  3 21:23:58 2013
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -107,7 +106,7 @@ public class TestTaskAttemptContainerReq
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             mock(TaskSplitMetaInfo.class), jobConf, taListener,
-            mock(OutputCommitter.class), jobToken, credentials,
+            jobToken, credentials,
             new SystemClock(), null);
 
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Jan  3 21:23:58 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -71,7 +70,6 @@ public class TestTaskImpl {
   
   private JobConf conf;
   private TaskAttemptListener taskAttemptListener;
-  private OutputCommitter committer;
   private Token<JobTokenIdentifier> jobToken;
   private JobId jobId;
   private Path remoteJobConfFile;
@@ -99,13 +97,13 @@ public class TestTaskImpl {
 
     public MockTaskImpl(JobId jobId, int partition,
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
-        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+        TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
-          remoteJobConfFile, conf, taskAttemptListener, committer, 
+          remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
           completedTasksFromPreviousRun, startCount, metrics, appContext);
       this.taskType = taskType;
@@ -120,7 +118,7 @@ public class TestTaskImpl {
     protected TaskAttemptImpl createAttempt() {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
           eventHandler, taskAttemptListener, remoteJobConfFile, partition,
-          conf, committer, jobToken, credentials, clock, appContext, taskType);
+          conf, jobToken, credentials, clock, appContext, taskType);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -145,12 +143,11 @@ public class TestTaskImpl {
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
-        JobConf conf, OutputCommitter committer,
-        Token<JobTokenIdentifier> jobToken,
+        JobConf conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         AppContext appContext, TaskType taskType) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
-          dataLocations, committer, jobToken, credentials, clock, appContext);
+          dataLocations, jobToken, credentials, clock, appContext);
       this.taskType = taskType;
     }
 
@@ -210,7 +207,6 @@ public class TestTaskImpl {
     
     conf = new JobConf();
     taskAttemptListener = mock(TaskAttemptListener.class);
-    committer = mock(OutputCommitter.class);
     jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
     remoteJobConfFile = mock(Path.class);
     credentials = null;
@@ -235,7 +231,7 @@ public class TestTaskImpl {
   
   private MockTaskImpl createMockTask(TaskType taskType) {
     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
-        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
         metrics, appContext, taskType);
@@ -602,4 +598,73 @@ public class TestTaskImpl {
     assertTaskScheduledState();
     assertEquals(3, taskAttempts.size());
   }
+
+  @Test
+  public void testFailedTransitions() {
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, appContext, TaskType.MAP) {
+          @Override
+          protected int getMaxAttempts() {
+            return 1;
+          }
+    };
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+
+    // add three more speculative attempts
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    assertEquals(4, taskAttempts.size());
+
+    // have the first attempt fail, verify task failed due to no retries
+    MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
+    taskAttempt.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+
+    // verify task can no longer be killed
+    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+
+    // verify speculative doesn't launch new tasks
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    assertEquals(4, taskAttempts.size());
+
+    // verify attempt events from active tasks don't knock task out of FAILED
+    taskAttempt = taskAttempts.get(1);
+    taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt.setState(TaskAttemptState.FAILED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt = taskAttempts.get(2);
+    taskAttempt.setState(TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+    taskAttempt = taskAttempts.get(3);
+    taskAttempt.setState(TaskAttemptState.KILLED);
+    mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertEquals(TaskState.FAILED, mockTask.getState());
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Thu Jan  3 21:23:58 2013
@@ -6,8 +6,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeast;
+import org.mockito.ArgumentCaptor;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -18,15 +22,21 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -272,6 +282,150 @@ public class TestContainerLauncherImpl {
     } finally {
       ut.stop();
       verify(mockCM).stopContainer(any(StopContainerRequest.class));
-}
+    }
+  }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testContainerCleaned() throws Exception {
+    LOG.info("STARTING testContainerCleaned");
+    
+    CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
+    CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
+
+    YarnRPC mockRpc = mock(YarnRPC.class);
+    AppContext mockContext = mock(AppContext.class);
+    
+    EventHandler mockEventHandler = mock(EventHandler.class);
+    when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+
+    ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
+    when(mockRpc.getProxy(eq(ContainerManager.class), 
+        any(InetSocketAddress.class), any(Configuration.class)))
+        .thenReturn(mockCM);
+    
+    ContainerLauncherImplUnderTest ut = 
+      new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+    
+    Configuration conf = new Configuration();
+    ut.init(conf);
+    ut.start();
+    try {
+      ContainerId contId = makeContainerId(0l, 0, 0, 1);
+      TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
+      String cmAddress = "127.0.0.1:8000";
+      StartContainerResponse startResp = 
+        recordFactory.newRecordInstance(StartContainerResponse.class);
+      startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, 
+          ShuffleHandler.serializeMetaData(80));
+      
+     
+      LOG.info("inserting launch event");
+      ContainerRemoteLaunchEvent mockLaunchEvent = 
+        mock(ContainerRemoteLaunchEvent.class);
+      when(mockLaunchEvent.getType())
+        .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
+      when(mockLaunchEvent.getContainerID())
+        .thenReturn(contId);
+      when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+      when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+      ut.handle(mockLaunchEvent);
+      
+      startLaunchBarrier.await();
+      
+           
+      LOG.info("inserting cleanup event");
+      ContainerLauncherEvent mockCleanupEvent = 
+        mock(ContainerLauncherEvent.class);
+      when(mockCleanupEvent.getType())
+        .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
+      when(mockCleanupEvent.getContainerID())
+        .thenReturn(contId);
+      when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+      when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+      ut.handle(mockCleanupEvent);
+
+      completeLaunchBarrier.await();
+     
+      ut.waitForPoolToIdle();
+      
+      ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+      verify(mockEventHandler, atLeast(2)).handle(arg.capture());
+      boolean containerCleaned = false;
+      
+      for (int i =0; i < arg.getAllValues().size(); i++) {
+        LOG.info(arg.getAllValues().get(i).toString());
+        Event currentEvent = arg.getAllValues().get(i);
+        if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
+          containerCleaned = true;
+        }
+      }
+      assert(containerCleaned);
+      
+    } finally {
+      ut.stop();
+    }
+  }
+  
+  private static class ContainerManagerForTest implements ContainerManager {
+
+    private CyclicBarrier startLaunchBarrier;
+    private CyclicBarrier completeLaunchBarrier;
+
+    ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) {
+      this.startLaunchBarrier = startLaunchBarrier;
+      this.completeLaunchBarrier = completeLaunchBarrier;
+    }
+    @Override
+    public StartContainerResponse startContainer(StartContainerRequest request)
+        throws YarnRemoteException {
+      try {
+        startLaunchBarrier.await();
+        completeLaunchBarrier.await();
+        //To ensure the kill is started before the launch
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      } 
+      
+      throw new ContainerException("Force fail CM");
+      
+    }
+
+    @Override
+    public StopContainerResponse stopContainer(StopContainerRequest request)
+        throws YarnRemoteException {
+    
+      return null;
+    }
+
+    @Override
+    public GetContainerStatusResponse getContainerStatus(
+        GetContainerStatusRequest request) throws YarnRemoteException {
+    
+      return null;
+    }
   }
+  
+  @SuppressWarnings("serial")
+  private static class ContainerException extends YarnRemoteException {
+
+    public ContainerException(String message) {
+      super(message);
+    }
+
+    @Override
+    public String getRemoteTrace() {
+      return null;
+    }
+
+    @Override
+    public YarnRemoteException getCause() {
+      return null;
+    }
+    
+  }
+  
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Jan  3 21:23:58 2013
@@ -175,6 +175,8 @@ public class ClusterStatus implements Wr
     
   }
   
+  public static final int UNINITIALIZED_MEMORY_VALUE = -1;
+  
   private int numActiveTrackers;
   private Collection<String> activeTrackers = new ArrayList<String>();
   private int numBlacklistedTrackers;
@@ -384,6 +386,22 @@ public class ClusterStatus implements Wr
   public JobTrackerStatus getJobTrackerStatus() {
     return status;
   }
+  
+  /**
+   * Returns UNINITIALIZED_MEMORY_VALUE (-1)
+   */
+  @Deprecated
+  public long getMaxMemory() {
+    return UNINITIALIZED_MEMORY_VALUE;
+  }
+  
+  /**
+   * Returns UNINITIALIZED_MEMORY_VALUE (-1)
+   */
+  @Deprecated
+  public long getUsedMemory() {
+    return UNINITIALIZED_MEMORY_VALUE;
+  }
 
   /**
    * Gets the list of blacklisted trackers along with reasons for blacklisting.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java Thu Jan  3 21:23:58 2013
@@ -271,7 +271,7 @@ class SortedRanges implements Writable{
     }
     
     public boolean equals(Object o) {
-      if(o!=null && o instanceof Range) {
+      if (o instanceof Range) {
         Range range = (Range)o;
         return startIndex==range.startIndex &&
         length==range.length;
@@ -285,10 +285,11 @@ class SortedRanges implements Writable{
     }
     
     public int compareTo(Range o) {
-      if(this.equals(o)) {
-        return 0;
-      }
-      return (this.startIndex > o.startIndex) ? 1:-1;
+      // Ensure sgn(x.compareTo(y) == -sgn(y.compareTo(x))
+      return this.startIndex < o.startIndex ? -1 :
+          (this.startIndex > o.startIndex ? 1 :
+          (this.length < o.length ? -1 :
+          (this.length > o.length ? 1 : 0)));
     }
 
     public void readFields(DataInput in) throws IOException {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Jan  3 21:23:58 2013
@@ -465,6 +465,15 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
 
   /**
+   * How long to wait in milliseconds for the output committer to cancel
+   * an operation when the job is being killed
+   */
+  public static final String MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+      MR_AM_PREFIX + "job.committer.cancel-timeout";
+  public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+      60 * 1000;
+
+  /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that
    * is set by the MR framework and read by it too.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jan  3 21:23:58 2013
@@ -282,7 +282,7 @@ class Fetcher<K,V> extends Thread {
       LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
       // verify that replyHash is HMac of encHash
       SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
-      LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+      LOG.info("for url="+msgToEncode+" sent hash and received reply");
     } catch (IOException ie) {
       boolean connectExcpt = ie instanceof ConnectException;
       ioErrs.increment(1);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Thu Jan  3 21:23:58 2013
@@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 
-@SuppressWarnings(value={"unchecked", "deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
 public class MergeManager<K, V> {
@@ -85,7 +87,7 @@ public class MergeManager<K, V> {
 
   Set<MapOutput<K, V>> inMemoryMapOutputs = 
     new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final InMemoryMerger inMemoryMerger;
+  private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
   
   Set<Path> onDiskMapOutputs = new TreeSet<Path>();
   private final OnDiskMerger onDiskMerger;
@@ -179,6 +181,8 @@ public class MergeManager<K, V> {
           + singleShuffleMemoryLimitPercent);
     }
 
+    usedMemory = 0L;
+    commitMemory = 0L;
     this.maxSingleShuffleLimit = 
       (long)(memoryLimit * singleShuffleMemoryLimitPercent);
     this.memToMemMergeOutputsThreshold = 
@@ -210,7 +214,7 @@ public class MergeManager<K, V> {
       this.memToMemMerger = null;
     }
     
-    this.inMemoryMerger = new InMemoryMerger(this);
+    this.inMemoryMerger = createInMemoryMerger();
     this.inMemoryMerger.start();
     
     this.onDiskMerger = new OnDiskMerger(this);
@@ -219,11 +223,19 @@ public class MergeManager<K, V> {
     this.mergePhase = mergePhase;
   }
   
+  protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
+    return new InMemoryMerger(this);
+  }
 
   TaskAttemptID getReduceId() {
     return reduceId;
   }
 
+  @VisibleForTesting
+  ExceptionReporter getExceptionReporter() {
+    return exceptionReporter;
+  }
+
   public void waitForInMemoryMerge() throws InterruptedException {
     inMemoryMerger.waitForMerge();
   }
@@ -288,7 +300,6 @@ public class MergeManager<K, V> {
   }
   
   synchronized void unreserve(long size) {
-    commitMemory -= size;
     usedMemory -= size;
   }
 
@@ -300,24 +311,20 @@ public class MergeManager<K, V> {
 
     commitMemory+= mapOutput.getSize();
 
-    synchronized (inMemoryMerger) {
-      // Can hang if mergeThreshold is really low.
-      if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
-        LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-            commitMemory + " > mergeThreshold=" + mergeThreshold + 
-            ". Current usedMemory=" + usedMemory);
-        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-        inMemoryMergedMapOutputs.clear();
-        inMemoryMerger.startMerge(inMemoryMapOutputs);
-      } 
+    // Can hang if mergeThreshold is really low.
+    if (commitMemory >= mergeThreshold) {
+      LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+          commitMemory + " > mergeThreshold=" + mergeThreshold + 
+          ". Current usedMemory=" + usedMemory);
+      inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+      inMemoryMergedMapOutputs.clear();
+      inMemoryMerger.startMerge(inMemoryMapOutputs);
+      commitMemory = 0L;  // Reset commitMemory.
     }
     
     if (memToMemMerger != null) {
-      synchronized (memToMemMerger) {
-        if (!memToMemMerger.isInProgress() && 
-            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
-          memToMemMerger.startMerge(inMemoryMapOutputs);
-        }
+      if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { 
+        memToMemMerger.startMerge(inMemoryMapOutputs);
       }
     }
   }
@@ -333,11 +340,8 @@ public class MergeManager<K, V> {
   public synchronized void closeOnDiskFile(Path file) {
     onDiskMapOutputs.add(file);
     
-    synchronized (onDiskMerger) {
-      if (!onDiskMerger.isInProgress() && 
-          onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-        onDiskMerger.startMerge(onDiskMapOutputs);
-      }
+    if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+      onDiskMerger.startMerge(onDiskMapOutputs);
     }
   }
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Thu Jan  3 21:23:58 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend
   
   private static final Log LOG = LogFactory.getLog(MergeThread.class);
 
-  private volatile boolean inProgress = false;
-  private List<T> inputs = new ArrayList<T>();
+  private AtomicInteger numPending = new AtomicInteger(0);
+  private LinkedList<List<T>> pendingToBeMerged;
   protected final MergeManager<K,V> manager;
   private final ExceptionReporter reporter;
   private boolean closed = false;
@@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend
   
   public MergeThread(MergeManager<K,V> manager, int mergeFactor,
                      ExceptionReporter reporter) {
+    this.pendingToBeMerged = new LinkedList<List<T>>();
     this.manager = manager;
     this.mergeFactor = mergeFactor;
     this.reporter = reporter;
@@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend
     interrupt();
   }
 
-  public synchronized boolean isInProgress() {
-    return inProgress;
-  }
-  
-  public synchronized void startMerge(Set<T> inputs) {
+  public void startMerge(Set<T> inputs) {
     if (!closed) {
-      inProgress = true;
-      this.inputs = new ArrayList<T>();
+      numPending.incrementAndGet();
+      List<T> toMergeInputs = new ArrayList<T>();
       Iterator<T> iter=inputs.iterator();
       for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
-        this.inputs.add(iter.next());
+        toMergeInputs.add(iter.next());
         iter.remove();
       }
-      LOG.info(getName() + ": Starting merge with " + this.inputs.size() + 
+      LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() + 
                " segments, while ignoring " + inputs.size() + " segments");
-      notifyAll();
+      synchronized(pendingToBeMerged) {
+        pendingToBeMerged.addLast(toMergeInputs);
+        pendingToBeMerged.notifyAll();
+      }
     }
   }
 
   public synchronized void waitForMerge() throws InterruptedException {
-    while (inProgress) {
+    while (numPending.get() > 0) {
       wait();
     }
   }
 
   public void run() {
     while (true) {
+      List<T> inputs = null;
       try {
         // Wait for notification to start the merge...
-        synchronized (this) {
-          while (!inProgress) {
-            wait();
+        synchronized (pendingToBeMerged) {
+          while(pendingToBeMerged.size() <= 0) {
+            pendingToBeMerged.wait();
           }
+          // Pickup the inputs to merge.
+          inputs = pendingToBeMerged.removeFirst();
         }
 
         // Merge
         merge(inputs);
       } catch (InterruptedException ie) {
+        numPending.set(0);
         return;
       } catch(Throwable t) {
+        numPending.set(0);
         reporter.reportException(t);
         return;
       } finally {
         synchronized (this) {
-          // Clear inputs
-          inputs = null;
-          inProgress = false;        
+          numPending.decrementAndGet();
           notifyAll();
         }
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jan  3 21:23:58 2013
@@ -874,6 +874,13 @@
 </property>
 
 <property>
+  <name>yarn.app.mapreduce.am.job.committer.cancel-timeout</name>
+  <value>60000</value>
+  <description>The amount of time in milliseconds to wait for the output
+    committer to cancel an operation if the job is killed</description>
+</property>
+
+<property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>
   <description>The interval in ms at which the MR AppMaster should send

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1423068-1428155

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Thu Jan  3 21:23:58 2013
@@ -198,5 +198,6 @@
     <module>hadoop-mapreduce-client-app</module>
     <module>hadoop-mapreduce-client-jobclient</module>
     <module>hadoop-mapreduce-client-hs</module>
+    <module>hadoop-mapreduce-client-hs-plugins</module>
   </modules>
 </project>