You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2013/01/03 22:24:08 UTC
svn commit: r1428601 [2/2] - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/had...
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Jan 3 21:23:58 2013
@@ -201,8 +201,7 @@ import org.junit.Test;
Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- getCommitter(), isNewApiCommitter(),
- currentUser.getUserName(), getContext());
+ isNewApiCommitter(), currentUser.getUserName(), getContext());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Jan 3 21:23:58 2013
@@ -19,46 +19,51 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Records;
@@ -69,121 +74,223 @@ import org.junit.Test;
/**
* Tests various functions of the JobImpl class
*/
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
public class TestJobImpl {
@Test
- public void testJobNoTasksTransition() {
- JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
- JobImpl mockJob = mock(JobImpl.class);
-
- // Force checkJobCompleteSuccess to return null
- Task mockTask = mock(Task.class);
- Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
- tasks.put(mockTask.getID(), mockTask);
- mockJob.tasks = tasks;
+ public void testJobNoTasks() {
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = mock(OutputCommitter.class);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createStubbedJob(conf, dispatcher, 0);
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.SUCCEEDED);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
- when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
- JobEvent mockJobEvent = mock(JobEvent.class);
- JobStateInternal state = trans.transition(mockJob, mockJobEvent);
- Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
- JobStateInternal.ERROR, state);
+ @Test(timeout=20000)
+ public void testCommitJobFailsJob() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+ completeJobTasks(job);
+ assertJobState(job, JobStateInternal.COMMITTING);
+
+ // let the committer fail and verify the job fails
+ syncBarrier.await();
+ assertJobState(job, JobStateInternal.FAILED);
+ dispatcher.stop();
+ commitHandler.stop();
}
- @Test
- public void testCommitJobFailsJob() {
+ @Test(timeout=20000)
+ public void testCheckJobCompleteSuccess() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+ completeJobTasks(job);
+ assertJobState(job, JobStateInternal.COMMITTING);
+
+ // let the committer complete and verify the job succeeds
+ syncBarrier.await();
+ assertJobState(job, JobStateInternal.SUCCEEDED);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ @Test(timeout=20000)
+ public void testKilledDuringSetup() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = new StubbedOutputCommitter() {
+ @Override
+ public synchronized void setupJob(JobContext jobContext)
+ throws IOException {
+ while (!Thread.interrupted()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
- JobImpl mockJob = mock(JobImpl.class);
- mockJob.tasks = new HashMap<TaskId, Task>();
- OutputCommitter mockCommitter = mock(OutputCommitter.class);
- EventHandler mockEventHandler = mock(EventHandler.class);
- JobContext mockJobContext = mock(JobContext.class);
-
- when(mockJob.getCommitter()).thenReturn(mockCommitter);
- when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
- when(mockJob.getJobContext()).thenReturn(mockJobContext);
- when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
- JobStateInternal.KILLED);
- when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
- JobStateInternal.FAILED);
- when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
- JobStateInternal.SUCCEEDED);
-
- try {
- doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
- } catch (IOException e) {
- // commitJob stubbed out, so this can't happen
- }
- doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
- JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
- Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
- "for successful job", jobState);
- Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
- JobStateInternal.FAILED, jobState);
- verify(mockJob).abortJob(
- eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.SETUP);
+
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+ assertJobState(job, JobStateInternal.KILLED);
+ dispatcher.stop();
+ commitHandler.stop();
}
- @Test
- public void testCheckJobCompleteSuccess() {
-
- JobImpl mockJob = mock(JobImpl.class);
- mockJob.tasks = new HashMap<TaskId, Task>();
- OutputCommitter mockCommitter = mock(OutputCommitter.class);
- EventHandler mockEventHandler = mock(EventHandler.class);
- JobContext mockJobContext = mock(JobContext.class);
-
- when(mockJob.getCommitter()).thenReturn(mockCommitter);
- when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
- when(mockJob.getJobContext()).thenReturn(mockJobContext);
- doNothing().when(mockJob).setFinishTime();
- doNothing().when(mockJob).logJobHistoryFinishedEvent();
- when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
- JobStateInternal.SUCCEEDED);
-
- try {
- doNothing().when(mockCommitter).commitJob(any(JobContext.class));
- } catch (IOException e) {
- // commitJob stubbed out, so this can't happen
- }
- doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
- Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
- "for successful job",
- JobImpl.checkJobCompleteSuccess(mockJob));
- Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
- JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+ @Test(timeout=20000)
+ public void testKilledDuringCommit() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+ completeJobTasks(job);
+ assertJobState(job, JobStateInternal.COMMITTING);
+
+ syncBarrier.await();
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+ assertJobState(job, JobStateInternal.KILLED);
+ dispatcher.stop();
+ commitHandler.stop();
}
- @Test
- public void testCheckJobCompleteSuccessFailed() {
- JobImpl mockJob = mock(JobImpl.class);
+ @Test(timeout=20000)
+ public void testKilledDuringFailAbort() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = new StubbedOutputCommitter() {
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ throw new IOException("forced failure");
+ }
+
+ @Override
+ public synchronized void abortJob(JobContext jobContext, State state)
+ throws IOException {
+ while (!Thread.interrupted()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.FAIL_ABORT);
- // Make the completedTasks not equal the getTasks()
- Task mockTask = mock(Task.class);
- Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
- tasks.put(mockTask.getID(), mockTask);
- mockJob.tasks = tasks;
-
- try {
- // Just in case the code breaks and reaches these calls
- OutputCommitter mockCommitter = mock(OutputCommitter.class);
- EventHandler mockEventHandler = mock(EventHandler.class);
- doNothing().when(mockCommitter).commitJob(any(JobContext.class));
- doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
- } catch (IOException e) {
- e.printStackTrace();
- }
- Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
- "for unsuccessful job",
- JobImpl.checkJobCompleteSuccess(mockJob));
+ job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+ assertJobState(job, JobStateInternal.KILLED);
+ dispatcher.stop();
+ commitHandler.stop();
}
+ @Test(timeout=20000)
+ public void testKilledDuringKillAbort() throws Exception {
+ Configuration conf = new Configuration();
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = new StubbedOutputCommitter() {
+ @Override
+ public synchronized void abortJob(JobContext jobContext, State state)
+ throws IOException {
+ while (!Thread.interrupted()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.SETUP);
+
+ job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+ assertJobState(job, JobStateInternal.KILL_ABORT);
+
+ job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+ assertJobState(job, JobStateInternal.KILLED);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
public static void main(String[] args) throws Exception {
TestJobImpl t = new TestJobImpl();
- t.testJobNoTasksTransition();
+ t.testJobNoTasks();
t.testCheckJobCompleteSuccess();
- t.testCheckJobCompleteSuccessFailed();
t.testCheckAccess();
t.testReportDiagnostics();
t.testUberDecision();
@@ -208,7 +315,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -219,7 +326,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -230,7 +337,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -241,7 +348,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -252,7 +359,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, null, true, null, 0, null, null);
+ null, null, true, null, 0, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -270,8 +377,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, mock(OutputCommitter.class),
- true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null);
job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics();
Assert.assertNotNull(diagnostics);
@@ -282,8 +388,7 @@ public class TestJobImpl {
mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null,
- mrAppMetrics, mock(OutputCommitter.class),
- true, null, 0, null, null);
+ mrAppMetrics, true, null, 0, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent);
diagnostics = job.getReport().getDiagnostics();
@@ -338,20 +443,23 @@ public class TestJobImpl {
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
- mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
- InitTransition initTransition = getInitTransition();
+ mrAppMetrics, true, null, 0, null, null);
+ InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);
initTransition.transition(job, mockJobEvent);
boolean isUber = job.isUber();
return isUber;
}
- private static InitTransition getInitTransition() {
+ private static InitTransition getInitTransition(final int numSplits) {
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
- return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
- new TaskSplitMetaInfo() };
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ splits[i] = new TaskSplitMetaInfo();
+ }
+ return splits;
}
};
return initTransition;
@@ -360,19 +468,24 @@ public class TestJobImpl {
@Test
public void testTransitionsAtFailed() throws IOException {
Configuration conf = new Configuration();
- JobID jobID = JobID.forName("job_1234567890000_0001");
- JobId jobId = TypeConverter.toYarn(jobID);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+
OutputCommitter committer = mock(OutputCommitter.class);
doThrow(new IOException("forcefail"))
.when(committer).setupJob(any(JobContext.class));
- InlineDispatcher dispatcher = new InlineDispatcher();
- JobImpl job = new StubbedJob(jobId, Records
- .newRecord(ApplicationAttemptId.class), conf,
- dispatcher.getEventHandler(), committer, true, null);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
- dispatcher.register(JobEventType.class, job);
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
- Assert.assertEquals(JobState.FAILED, job.getState());
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.FAILED);
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
Assert.assertEquals(JobState.FAILED, job.getState());
@@ -382,17 +495,86 @@ public class TestJobImpl {
Assert.assertEquals(JobState.FAILED, job.getState());
job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
Assert.assertEquals(JobState.FAILED, job.getState());
+
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ private static CommitterEventHandler createCommitterEventHandler(
+ Dispatcher dispatcher, OutputCommitter committer) {
+ SystemClock clock = new SystemClock();
+ AppContext appContext = mock(AppContext.class);
+ when(appContext.getEventHandler()).thenReturn(
+ dispatcher.getEventHandler());
+ when(appContext.getClock()).thenReturn(clock);
+ CommitterEventHandler handler =
+ new CommitterEventHandler(appContext, committer);
+ dispatcher.register(CommitterEventType.class, handler);
+ return handler;
+ }
+
+ private static StubbedJob createStubbedJob(Configuration conf,
+ Dispatcher dispatcher, int numSplits) {
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ StubbedJob job = new StubbedJob(jobId,
+ Records.newRecord(ApplicationAttemptId.class), conf,
+ dispatcher.getEventHandler(), true, "somebody", numSplits);
+ dispatcher.register(JobEventType.class, job);
+ EventHandler mockHandler = mock(EventHandler.class);
+ dispatcher.register(TaskEventType.class, mockHandler);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ mockHandler);
+ dispatcher.register(JobFinishEvent.Type.class, mockHandler);
+ return job;
+ }
+
+ private static StubbedJob createRunningStubbedJob(Configuration conf,
+ Dispatcher dispatcher, int numSplits) {
+ StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.RUNNING);
+ return job;
+ }
+
+ private static void completeJobTasks(JobImpl job) {
+ // complete the map tasks and the reduce tasks so we start committing
+ int numMaps = job.getTotalMaps();
+ for (int i = 0; i < numMaps; ++i) {
+ job.handle(new JobTaskEvent(
+ MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+ TaskState.SUCCEEDED));
+ Assert.assertEquals(JobState.RUNNING, job.getState());
+ }
+ int numReduces = job.getTotalReduces();
+ for (int i = 0; i < numReduces; ++i) {
+ job.handle(new JobTaskEvent(
+ MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+ TaskState.SUCCEEDED));
+ Assert.assertEquals(JobState.RUNNING, job.getState());
+ }
+ }
+
+ private static void assertJobState(JobImpl job, JobStateInternal state) {
+ int timeToWaitMsec = 5 * 1000;
+ while (timeToWaitMsec > 0 && job.getInternalState() != state) {
+ try {
+ Thread.sleep(10);
+ timeToWaitMsec -= 10;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ Assert.assertEquals(state, job.getInternalState());
}
private static class StubbedJob extends JobImpl {
//override the init transition
- private final InitTransition initTransition = getInitTransition();
- StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
- = stateMachineFactory.addTransition(JobStateInternal.NEW,
- EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
- JobEventType.JOB_INIT,
- // This is abusive.
- initTransition);
+ private final InitTransition initTransition;
+ StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+ localFactory;
private final StateMachine<JobStateInternal, JobEventType, JobEvent>
localStateMachine;
@@ -404,15 +586,102 @@ public class TestJobImpl {
public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
- OutputCommitter committer, boolean newApiCommitter, String user) {
+ boolean newApiCommitter, String user, int numSplits) {
super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(),
- new SystemClock(), null, MRAppMetrics.create(), committer,
+ new SystemClock(), null, MRAppMetrics.create(),
newApiCommitter, user, System.currentTimeMillis(), null, null);
+ initTransition = getInitTransition(numSplits);
+ localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+ JobEventType.JOB_INIT,
+ // This is abusive.
+ initTransition);
+
// This "this leak" is okay because the retained pointer is in an
// instance variable.
localStateMachine = localFactory.make(this);
}
}
+
+ private static class StubbedOutputCommitter extends OutputCommitter {
+
+ public StubbedOutputCommitter() {
+ super();
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return false;
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext taskContext) throws IOException {
+ }
+
+ @Override
+ public void abortTask(TaskAttemptContext taskContext) throws IOException {
+ }
+ }
+
+ private static class TestingOutputCommitter extends StubbedOutputCommitter {
+ CyclicBarrier syncBarrier;
+ boolean shouldSucceed;
+
+ public TestingOutputCommitter(CyclicBarrier syncBarrier,
+ boolean shouldSucceed) {
+ super();
+ this.syncBarrier = syncBarrier;
+ this.shouldSucceed = shouldSucceed;
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ } catch (InterruptedException e) {
+ }
+
+ if (!shouldSucceed) {
+ throw new IOException("forced failure");
+ }
+ }
+ }
+
+ private static class WaitingOutputCommitter extends TestingOutputCommitter {
+ public WaitingOutputCommitter(CyclicBarrier syncBarrier,
+ boolean shouldSucceed) {
+ super(syncBarrier, shouldSucceed);
+ }
+
+ @Override
+ public void commitJob(JobContext jobContext) throws IOException {
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ } catch (InterruptedException e) {
+ }
+
+ while (!Thread.interrupted()) {
+ try {
+ synchronized (this) {
+ wait();
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Thu Jan 3 21:23:58 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -253,10 +252,9 @@ public class TestTaskAttempt{
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
Path jobFile = mock(Path.class);
JobConf jobConf = new JobConf();
- OutputCommitter outputCommitter = mock(OutputCommitter.class);
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+ taskSplitMetaInfo, jobConf, taListener, null,
null, clock, null);
return taImpl;
}
@@ -342,7 +340,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ mock(Token.class), new Credentials(),
new SystemClock(), null);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -397,7 +395,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -453,7 +451,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -512,7 +510,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
- mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -578,7 +576,7 @@ public class TestTaskAttempt{
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
- jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -628,7 +626,7 @@ public class TestTaskAttempt{
when(resource.getMemory()).thenReturn(1024);
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
- jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+ jobFile, 1, splits, jobConf, taListener,
mock(Token.class), new Credentials(), new SystemClock(), appCtx);
NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Thu Jan 3 21:23:58 2013
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -107,7 +106,7 @@ public class TestTaskAttemptContainerReq
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener,
- mock(OutputCommitter.class), jobToken, credentials,
+ jobToken, credentials,
new SystemClock(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Jan 3 21:23:58 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -71,7 +70,6 @@ public class TestTaskImpl {
private JobConf conf;
private TaskAttemptListener taskAttemptListener;
- private OutputCommitter committer;
private Token<JobTokenIdentifier> jobToken;
private JobId jobId;
private Path remoteJobConfFile;
@@ -99,13 +97,13 @@ public class TestTaskImpl {
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
- TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ TaskAttemptListener taskAttemptListener,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
- remoteJobConfFile, conf, taskAttemptListener, committer,
+ remoteJobConfFile, conf, taskAttemptListener,
jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
this.taskType = taskType;
@@ -120,7 +118,7 @@ public class TestTaskImpl {
protected TaskAttemptImpl createAttempt() {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
- conf, committer, jobToken, credentials, clock, appContext, taskType);
+ conf, jobToken, credentials, clock, appContext, taskType);
taskAttempts.add(attempt);
return attempt;
}
@@ -145,12 +143,11 @@ public class TestTaskImpl {
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
- JobConf conf, OutputCommitter committer,
- Token<JobTokenIdentifier> jobToken,
+ JobConf conf, Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
AppContext appContext, TaskType taskType) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
- dataLocations, committer, jobToken, credentials, clock, appContext);
+ dataLocations, jobToken, credentials, clock, appContext);
this.taskType = taskType;
}
@@ -210,7 +207,6 @@ public class TestTaskImpl {
conf = new JobConf();
taskAttemptListener = mock(TaskAttemptListener.class);
- committer = mock(OutputCommitter.class);
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
remoteJobConfFile = mock(Path.class);
credentials = null;
@@ -235,7 +231,7 @@ public class TestTaskImpl {
private MockTaskImpl createMockTask(TaskType taskType) {
return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
- remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+ remoteJobConfFile, conf, taskAttemptListener, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
metrics, appContext, taskType);
@@ -602,4 +598,73 @@ public class TestTaskImpl {
assertTaskScheduledState();
assertEquals(3, taskAttempts.size());
}
+
+ @Test
+ public void testFailedTransitions() {
+ mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ remoteJobConfFile, conf, taskAttemptListener, jobToken,
+ credentials, clock,
+ completedTasksFromPreviousRun, startCount,
+ metrics, appContext, TaskType.MAP) {
+ @Override
+ protected int getMaxAttempts() {
+ return 1;
+ }
+ };
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+
+ // add three more speculative attempts
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ assertEquals(4, taskAttempts.size());
+
+ // have the first attempt fail, verify task failed due to no retries
+ MockTaskAttemptImpl taskAttempt = taskAttempts.get(0);
+ taskAttempt.setState(TaskAttemptState.FAILED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+
+ // verify task can no longer be killed
+ mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+
+ // verify speculative doesn't launch new tasks
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ assertEquals(4, taskAttempts.size());
+
+ // verify attempt events from active tasks don't knock task out of FAILED
+ taskAttempt = taskAttempts.get(1);
+ taskAttempt.setState(TaskAttemptState.COMMIT_PENDING);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt.setState(TaskAttemptState.FAILED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt = taskAttempts.get(2);
+ taskAttempt.setState(TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ taskAttempt = taskAttempts.get(3);
+ taskAttempt.setState(TaskAttemptState.KILLED);
+ mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertEquals(TaskState.FAILED, mockTask.getState());
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Thu Jan 3 21:23:58 2013
@@ -6,8 +6,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.atLeast;
+import org.mockito.ArgumentCaptor;
import java.net.InetSocketAddress;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -18,15 +22,21 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -272,6 +282,150 @@ public class TestContainerLauncherImpl {
} finally {
ut.stop();
verify(mockCM).stopContainer(any(StopContainerRequest.class));
-}
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Test
+ public void testContainerCleaned() throws Exception {
+ LOG.info("STARTING testContainerCleaned");
+
+ CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
+ CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
+
+ YarnRPC mockRpc = mock(YarnRPC.class);
+ AppContext mockContext = mock(AppContext.class);
+
+ EventHandler mockEventHandler = mock(EventHandler.class);
+ when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
+
+ ContainerManager mockCM = new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
+ when(mockRpc.getProxy(eq(ContainerManager.class),
+ any(InetSocketAddress.class), any(Configuration.class)))
+ .thenReturn(mockCM);
+
+ ContainerLauncherImplUnderTest ut =
+ new ContainerLauncherImplUnderTest(mockContext, mockRpc);
+
+ Configuration conf = new Configuration();
+ ut.init(conf);
+ ut.start();
+ try {
+ ContainerId contId = makeContainerId(0l, 0, 0, 1);
+ TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
+ String cmAddress = "127.0.0.1:8000";
+ StartContainerResponse startResp =
+ recordFactory.newRecordInstance(StartContainerResponse.class);
+ startResp.setServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+ ShuffleHandler.serializeMetaData(80));
+
+
+ LOG.info("inserting launch event");
+ ContainerRemoteLaunchEvent mockLaunchEvent =
+ mock(ContainerRemoteLaunchEvent.class);
+ when(mockLaunchEvent.getType())
+ .thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
+ when(mockLaunchEvent.getContainerID())
+ .thenReturn(contId);
+ when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+ when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+ ut.handle(mockLaunchEvent);
+
+ startLaunchBarrier.await();
+
+
+ LOG.info("inserting cleanup event");
+ ContainerLauncherEvent mockCleanupEvent =
+ mock(ContainerLauncherEvent.class);
+ when(mockCleanupEvent.getType())
+ .thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
+ when(mockCleanupEvent.getContainerID())
+ .thenReturn(contId);
+ when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
+ when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
+ ut.handle(mockCleanupEvent);
+
+ completeLaunchBarrier.await();
+
+ ut.waitForPoolToIdle();
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+ verify(mockEventHandler, atLeast(2)).handle(arg.capture());
+ boolean containerCleaned = false;
+
+ for (int i =0; i < arg.getAllValues().size(); i++) {
+ LOG.info(arg.getAllValues().get(i).toString());
+ Event currentEvent = arg.getAllValues().get(i);
+ if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
+ containerCleaned = true;
+ }
+ }
+ assert(containerCleaned);
+
+ } finally {
+ ut.stop();
+ }
+ }
+
+ private static class ContainerManagerForTest implements ContainerManager {
+
+ private CyclicBarrier startLaunchBarrier;
+ private CyclicBarrier completeLaunchBarrier;
+
+ ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) {
+ this.startLaunchBarrier = startLaunchBarrier;
+ this.completeLaunchBarrier = completeLaunchBarrier;
+ }
+ @Override
+ public StartContainerResponse startContainer(StartContainerRequest request)
+ throws YarnRemoteException {
+ try {
+ startLaunchBarrier.await();
+ completeLaunchBarrier.await();
+ //To ensure the kill is started before the launch
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+
+ throw new ContainerException("Force fail CM");
+
+ }
+
+ @Override
+ public StopContainerResponse stopContainer(StopContainerRequest request)
+ throws YarnRemoteException {
+
+ return null;
+ }
+
+ @Override
+ public GetContainerStatusResponse getContainerStatus(
+ GetContainerStatusRequest request) throws YarnRemoteException {
+
+ return null;
+ }
}
+
+ @SuppressWarnings("serial")
+ private static class ContainerException extends YarnRemoteException {
+
+ public ContainerException(String message) {
+ super(message);
+ }
+
+ @Override
+ public String getRemoteTrace() {
+ return null;
+ }
+
+ @Override
+ public YarnRemoteException getCause() {
+ return null;
+ }
+
+ }
+
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Jan 3 21:23:58 2013
@@ -175,6 +175,8 @@ public class ClusterStatus implements Wr
}
+ public static final int UNINITIALIZED_MEMORY_VALUE = -1;
+
private int numActiveTrackers;
private Collection<String> activeTrackers = new ArrayList<String>();
private int numBlacklistedTrackers;
@@ -384,6 +386,22 @@ public class ClusterStatus implements Wr
public JobTrackerStatus getJobTrackerStatus() {
return status;
}
+
+ /**
+ * Returns UNINITIALIZED_MEMORY_VALUE (-1)
+ */
+ @Deprecated
+ public long getMaxMemory() {
+ return UNINITIALIZED_MEMORY_VALUE;
+ }
+
+ /**
+ * Returns UNINITIALIZED_MEMORY_VALUE (-1)
+ */
+ @Deprecated
+ public long getUsedMemory() {
+ return UNINITIALIZED_MEMORY_VALUE;
+ }
/**
* Gets the list of blacklisted trackers along with reasons for blacklisting.
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SortedRanges.java Thu Jan 3 21:23:58 2013
@@ -271,7 +271,7 @@ class SortedRanges implements Writable{
}
public boolean equals(Object o) {
- if(o!=null && o instanceof Range) {
+ if (o instanceof Range) {
Range range = (Range)o;
return startIndex==range.startIndex &&
length==range.length;
@@ -285,10 +285,11 @@ class SortedRanges implements Writable{
}
public int compareTo(Range o) {
- if(this.equals(o)) {
- return 0;
- }
- return (this.startIndex > o.startIndex) ? 1:-1;
+ // Ensure sgn(x.compareTo(y) == -sgn(y.compareTo(x))
+ return this.startIndex < o.startIndex ? -1 :
+ (this.startIndex > o.startIndex ? 1 :
+ (this.length < o.length ? -1 :
+ (this.length > o.length ? 1 : 0)));
}
public void readFields(DataInput in) throws IOException {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Jan 3 21:23:58 2013
@@ -465,6 +465,15 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
/**
+ * How long to wait in milliseconds for the output committer to cancel
+ * an operation when the job is being killed
+ */
+ public static final String MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+ MR_AM_PREFIX + "job.committer.cancel-timeout";
+ public static final int DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS =
+ 60 * 1000;
+
+ /**
* Boolean. Create the base dirs in the JobHistoryEventHandler
* Set to false for multi-user clusters. This is an internal config that
* is set by the MR framework and read by it too.
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jan 3 21:23:58 2013
@@ -282,7 +282,7 @@ class Fetcher<K,V> extends Thread {
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
- LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
+ LOG.info("for url="+msgToEncode+" sent hash and received reply");
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Thu Jan 3 21:23:58 2013
@@ -58,7 +58,9 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
-@SuppressWarnings(value={"unchecked", "deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
+@SuppressWarnings(value={"unchecked"})
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@InterfaceStability.Unstable
public class MergeManager<K, V> {
@@ -85,7 +87,7 @@ public class MergeManager<K, V> {
Set<MapOutput<K, V>> inMemoryMapOutputs =
new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
- private final InMemoryMerger inMemoryMerger;
+ private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
Set<Path> onDiskMapOutputs = new TreeSet<Path>();
private final OnDiskMerger onDiskMerger;
@@ -179,6 +181,8 @@ public class MergeManager<K, V> {
+ singleShuffleMemoryLimitPercent);
}
+ usedMemory = 0L;
+ commitMemory = 0L;
this.maxSingleShuffleLimit =
(long)(memoryLimit * singleShuffleMemoryLimitPercent);
this.memToMemMergeOutputsThreshold =
@@ -210,7 +214,7 @@ public class MergeManager<K, V> {
this.memToMemMerger = null;
}
- this.inMemoryMerger = new InMemoryMerger(this);
+ this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start();
this.onDiskMerger = new OnDiskMerger(this);
@@ -219,11 +223,19 @@ public class MergeManager<K, V> {
this.mergePhase = mergePhase;
}
+ protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
+ return new InMemoryMerger(this);
+ }
TaskAttemptID getReduceId() {
return reduceId;
}
+ @VisibleForTesting
+ ExceptionReporter getExceptionReporter() {
+ return exceptionReporter;
+ }
+
public void waitForInMemoryMerge() throws InterruptedException {
inMemoryMerger.waitForMerge();
}
@@ -288,7 +300,6 @@ public class MergeManager<K, V> {
}
synchronized void unreserve(long size) {
- commitMemory -= size;
usedMemory -= size;
}
@@ -300,24 +311,20 @@ public class MergeManager<K, V> {
commitMemory+= mapOutput.getSize();
- synchronized (inMemoryMerger) {
- // Can hang if mergeThreshold is really low.
- if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold) {
- LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
- commitMemory + " > mergeThreshold=" + mergeThreshold +
- ". Current usedMemory=" + usedMemory);
- inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
- inMemoryMergedMapOutputs.clear();
- inMemoryMerger.startMerge(inMemoryMapOutputs);
- }
+ // Can hang if mergeThreshold is really low.
+ if (commitMemory >= mergeThreshold) {
+ LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
+ commitMemory + " > mergeThreshold=" + mergeThreshold +
+ ". Current usedMemory=" + usedMemory);
+ inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
+ inMemoryMergedMapOutputs.clear();
+ inMemoryMerger.startMerge(inMemoryMapOutputs);
+ commitMemory = 0L; // Reset commitMemory.
}
if (memToMemMerger != null) {
- synchronized (memToMemMerger) {
- if (!memToMemMerger.isInProgress() &&
- inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
- memToMemMerger.startMerge(inMemoryMapOutputs);
- }
+ if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
+ memToMemMerger.startMerge(inMemoryMapOutputs);
}
}
}
@@ -333,11 +340,8 @@ public class MergeManager<K, V> {
public synchronized void closeOnDiskFile(Path file) {
onDiskMapOutputs.add(file);
- synchronized (onDiskMerger) {
- if (!onDiskMerger.isInProgress() &&
- onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
- onDiskMerger.startMerge(onDiskMapOutputs);
- }
+ if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
+ onDiskMerger.startMerge(onDiskMapOutputs);
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java Thu Jan 3 21:23:58 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapreduce.task
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,8 +32,8 @@ abstract class MergeThread<T,K,V> extend
private static final Log LOG = LogFactory.getLog(MergeThread.class);
- private volatile boolean inProgress = false;
- private List<T> inputs = new ArrayList<T>();
+ private AtomicInteger numPending = new AtomicInteger(0);
+ private LinkedList<List<T>> pendingToBeMerged;
protected final MergeManager<K,V> manager;
private final ExceptionReporter reporter;
private boolean closed = false;
@@ -39,6 +41,7 @@ abstract class MergeThread<T,K,V> extend
public MergeThread(MergeManager<K,V> manager, int mergeFactor,
ExceptionReporter reporter) {
+ this.pendingToBeMerged = new LinkedList<List<T>>();
this.manager = manager;
this.mergeFactor = mergeFactor;
this.reporter = reporter;
@@ -50,53 +53,55 @@ abstract class MergeThread<T,K,V> extend
interrupt();
}
- public synchronized boolean isInProgress() {
- return inProgress;
- }
-
- public synchronized void startMerge(Set<T> inputs) {
+ public void startMerge(Set<T> inputs) {
if (!closed) {
- inProgress = true;
- this.inputs = new ArrayList<T>();
+ numPending.incrementAndGet();
+ List<T> toMergeInputs = new ArrayList<T>();
Iterator<T> iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
- this.inputs.add(iter.next());
+ toMergeInputs.add(iter.next());
iter.remove();
}
- LOG.info(getName() + ": Starting merge with " + this.inputs.size() +
+ LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
" segments, while ignoring " + inputs.size() + " segments");
- notifyAll();
+ synchronized(pendingToBeMerged) {
+ pendingToBeMerged.addLast(toMergeInputs);
+ pendingToBeMerged.notifyAll();
+ }
}
}
public synchronized void waitForMerge() throws InterruptedException {
- while (inProgress) {
+ while (numPending.get() > 0) {
wait();
}
}
public void run() {
while (true) {
+ List<T> inputs = null;
try {
// Wait for notification to start the merge...
- synchronized (this) {
- while (!inProgress) {
- wait();
+ synchronized (pendingToBeMerged) {
+ while(pendingToBeMerged.size() <= 0) {
+ pendingToBeMerged.wait();
}
+ // Pickup the inputs to merge.
+ inputs = pendingToBeMerged.removeFirst();
}
// Merge
merge(inputs);
} catch (InterruptedException ie) {
+ numPending.set(0);
return;
} catch(Throwable t) {
+ numPending.set(0);
reporter.reportException(t);
return;
} finally {
synchronized (this) {
- // Clear inputs
- inputs = null;
- inProgress = false;
+ numPending.decrementAndGet();
notifyAll();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jan 3 21:23:58 2013
@@ -874,6 +874,13 @@
</property>
<property>
+ <name>yarn.app.mapreduce.am.job.committer.cancel-timeout</name>
+ <value>60000</value>
+ <description>The amount of time in milliseconds to wait for the output
+ committer to cancel an operation if the job is killed</description>
+</property>
+
+<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>
<description>The interval in ms at which the MR AppMaster should send
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1423068-1428155
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1428601&r1=1428600&r2=1428601&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Thu Jan 3 21:23:58 2013
@@ -198,5 +198,6 @@
<module>hadoop-mapreduce-client-app</module>
<module>hadoop-mapreduce-client-jobclient</module>
<module>hadoop-mapreduce-client-hs</module>
+ <module>hadoop-mapreduce-client-hs-plugins</module>
</modules>
</project>