You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/04/08 18:12:33 UTC
svn commit: r1465674 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/j...
Author: jlowe
Date: Mon Apr 8 16:12:32 2013
New Revision: 1465674
URL: http://svn.apache.org/r1465674
Log:
MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot command from the RM. Contributed by Jian He
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Apr 8 16:12:32 2013
@@ -110,6 +110,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5006. Fix failing streaming tests due to MAPREDUCE-4994.
(Sandy Ryza via tomwhite)
+ MAPREDUCE-5086. MR app master deletes staging dir when sent a reboot
+ command from the RM. (Jian He via jlowe)
+
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Apr 8 16:12:32 2013
@@ -549,8 +549,14 @@ public class MRAppMaster extends Composi
}
try {
- //We are finishing cleanly so this is the last retry
- isLastAMRetry = true;
+ //if isLastAMRetry comes as true, should never set it to false
+ if ( !isLastAMRetry){
+ if (((JobImpl)job).getInternalState() != JobStateInternal.REBOOT) {
+ LOG.info("We are finishing cleanly so this is the last retry");
+ isLastAMRetry = true;
+ }
+ }
+ notifyIsLastAMRetry(isLastAMRetry);
// Stop all services
// This will also send the final report to the ResourceManager
LOG.info("Calling stop for all the services");
@@ -1272,19 +1278,25 @@ public class MRAppMaster extends Composi
// that they don't take too long in shutting down
if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
((ContainerAllocatorRouter) appMaster.containerAllocator)
- .setSignalled(true);
- ((ContainerAllocatorRouter) appMaster.containerAllocator)
- .setShouldUnregister(appMaster.isLastAMRetry);
- }
-
- if(appMaster.jobHistoryEventHandler != null) {
- appMaster.jobHistoryEventHandler
- .setForcejobCompletion(appMaster.isLastAMRetry);
+ .setSignalled(true);
}
+ appMaster.notifyIsLastAMRetry(appMaster.isLastAMRetry);
appMaster.stop();
}
}
+ public void notifyIsLastAMRetry(boolean isLastAMRetry){
+ if(containerAllocator instanceof ContainerAllocatorRouter) {
+ LOG.info("Notify RMCommunicator isAMLastRetry: " + isLastAMRetry);
+ ((ContainerAllocatorRouter) containerAllocator)
+ .setShouldUnregister(isLastAMRetry);
+ }
+ if(jobHistoryEventHandler != null) {
+ LOG.info("Notify JHEH isAMLastRetry: " + isLastAMRetry);
+ jobHistoryEventHandler.setForcejobCompletion(isLastAMRetry);
+ }
+ }
+
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Mon Apr 8 16:12:32 2013
@@ -30,5 +30,6 @@ public enum JobStateInternal {
KILL_WAIT,
KILL_ABORT,
KILLED,
- ERROR
+ ERROR,
+ REBOOT
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Mon Apr 8 16:12:32 2013
@@ -54,6 +54,6 @@ public enum JobEventType {
JOB_TASK_ATTEMPT_FETCH_FAILURE,
//Producer:RMContainerAllocator
- JOB_UPDATED_NODES
-
+ JOB_UPDATED_NODES,
+ JOB_AM_REBOOT
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Apr 8 16:12:32 2013
@@ -215,6 +215,8 @@ public class JobImpl implements org.apac
DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition
INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+ private static final InternalRebootTransition
+ INTERNAL_REBOOT_TRANSITION = new InternalRebootTransition();
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
@@ -246,6 +248,9 @@ public class JobImpl implements org.apac
.addTransition(JobStateInternal.NEW, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(JobStateInternal.NEW, JobStateInternal.REBOOT,
+ JobEventType.JOB_AM_REBOOT,
+ INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_UPDATED_NODES)
@@ -265,6 +270,9 @@ public class JobImpl implements org.apac
.addTransition(JobStateInternal.INITED, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(JobStateInternal.INITED, JobStateInternal.REBOOT,
+ JobEventType.JOB_AM_REBOOT,
+ INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.INITED, JobStateInternal.INITED,
JobEventType.JOB_UPDATED_NODES)
@@ -287,6 +295,9 @@ public class JobImpl implements org.apac
.addTransition(JobStateInternal.SETUP, JobStateInternal.ERROR,
JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(JobStateInternal.SETUP, JobStateInternal.REBOOT,
+ JobEventType.JOB_AM_REBOOT,
+ INTERNAL_REBOOT_TRANSITION)
// Ignore-able events
.addTransition(JobStateInternal.SETUP, JobStateInternal.SETUP,
JobEventType.JOB_UPDATED_NODES)
@@ -327,6 +338,9 @@ public class JobImpl implements org.apac
JobStateInternal.RUNNING,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ .addTransition(JobStateInternal.RUNNING, JobStateInternal.REBOOT,
+ JobEventType.JOB_AM_REBOOT,
+ INTERNAL_REBOOT_TRANSITION)
// Transitions from KILL_WAIT state.
.addTransition
@@ -352,7 +366,8 @@ public class JobImpl implements org.apac
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES,
JobEventType.JOB_MAP_TASK_RESCHEDULED,
- JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+ JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+ JobEventType.JOB_AM_REBOOT))
// Transitions from COMMITTING state
.addTransition(JobStateInternal.COMMITTING,
@@ -377,7 +392,10 @@ public class JobImpl implements org.apac
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Ignore-able events
+ .addTransition(JobStateInternal.COMMITTING, JobStateInternal.REBOOT,
+ JobEventType.JOB_AM_REBOOT,
+ INTERNAL_REBOOT_TRANSITION)
+ // Ignore-able events
.addTransition(JobStateInternal.COMMITTING,
JobStateInternal.COMMITTING,
EnumSet.of(JobEventType.JOB_UPDATED_NODES,
@@ -397,7 +415,8 @@ public class JobImpl implements org.apac
.addTransition(JobStateInternal.SUCCEEDED, JobStateInternal.SUCCEEDED,
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_UPDATED_NODES,
- JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+ JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+ JobEventType.JOB_AM_REBOOT))
// Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT,
@@ -425,7 +444,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_COMMIT_COMPLETED,
- JobEventType.JOB_COMMIT_FAILED))
+ JobEventType.JOB_COMMIT_FAILED,
+ JobEventType.JOB_AM_REBOOT))
// Transitions from KILL_ABORT state
.addTransition(JobStateInternal.KILL_ABORT,
@@ -452,7 +472,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_SETUP_COMPLETED,
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
- JobEventType.JOB_COMMIT_FAILED))
+ JobEventType.JOB_COMMIT_FAILED,
+ JobEventType.JOB_AM_REBOOT))
// Transitions from FAILED state
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
@@ -476,7 +497,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
- JobEventType.JOB_ABORT_COMPLETED))
+ JobEventType.JOB_ABORT_COMPLETED,
+ JobEventType.JOB_AM_REBOOT))
// Transitions from KILLED state
.addTransition(JobStateInternal.KILLED, JobStateInternal.KILLED,
@@ -498,7 +520,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_SETUP_FAILED,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
- JobEventType.JOB_ABORT_COMPLETED))
+ JobEventType.JOB_ABORT_COMPLETED,
+ JobEventType.JOB_AM_REBOOT))
// No transitions from INTERNAL_ERROR state. Ignore all.
.addTransition(
@@ -517,9 +540,33 @@ public class JobImpl implements org.apac
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
JobEventType.JOB_ABORT_COMPLETED,
- JobEventType.INTERNAL_ERROR))
+ JobEventType.INTERNAL_ERROR,
+ JobEventType.JOB_AM_REBOOT))
.addTransition(JobStateInternal.ERROR, JobStateInternal.ERROR,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
+ // No transitions from AM_REBOOT state. Ignore all.
+ .addTransition(
+ JobStateInternal.REBOOT,
+ JobStateInternal.REBOOT,
+ EnumSet.of(JobEventType.JOB_INIT,
+ JobEventType.JOB_KILL,
+ JobEventType.JOB_TASK_COMPLETED,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+ JobEventType.JOB_MAP_TASK_RESCHEDULED,
+ JobEventType.JOB_DIAGNOSTIC_UPDATE,
+ JobEventType.JOB_UPDATED_NODES,
+ JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+ JobEventType.JOB_SETUP_COMPLETED,
+ JobEventType.JOB_SETUP_FAILED,
+ JobEventType.JOB_COMMIT_COMPLETED,
+ JobEventType.JOB_COMMIT_FAILED,
+ JobEventType.JOB_ABORT_COMPLETED,
+ JobEventType.INTERNAL_ERROR,
+ JobEventType.JOB_AM_REBOOT))
+ .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+
// create the topology tables
.installTopology();
@@ -904,6 +951,8 @@ public class JobImpl implements org.apac
return JobState.RUNNING;
case FAIL_ABORT:
return JobState.FAILED;
+ case REBOOT:
+ return JobState.ERROR;
default:
return JobState.valueOf(smState.name());
}
@@ -972,6 +1021,7 @@ public class JobImpl implements org.apac
case KILLED:
metrics.killedJob(this);
break;
+ case REBOOT:
case ERROR:
case FAILED:
metrics.failedJob(this);
@@ -1899,8 +1949,17 @@ public class JobImpl implements org.apac
}
}
- private static class InternalErrorTransition implements
+ private static class InternalTerminationTransition implements
SingleArcTransition<JobImpl, JobEvent> {
+ JobStateInternal terminationState = null;
+ String jobHistoryString = null;
+ public InternalTerminationTransition(JobStateInternal stateInternal,
+ String jobHistoryString) {
+ this.terminationState = stateInternal;
+ //mostly a hack for jbhistoryserver
+ this.jobHistoryString = jobHistoryString;
+ }
+
@Override
public void transition(JobImpl job, JobEvent event) {
//TODO Is this JH event required.
@@ -1908,9 +1967,21 @@ public class JobImpl implements org.apac
JobUnsuccessfulCompletionEvent failedEvent =
new JobUnsuccessfulCompletionEvent(job.oldJobId,
job.finishTime, 0, 0,
- JobStateInternal.ERROR.toString());
+ jobHistoryString);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
- job.finished(JobStateInternal.ERROR);
+ job.finished(terminationState);
+ }
+ }
+
+ private static class InternalErrorTransition extends InternalTerminationTransition {
+ public InternalErrorTransition(){
+ super(JobStateInternal.ERROR, JobStateInternal.ERROR.toString());
+ }
+ }
+
+ private static class InternalRebootTransition extends InternalTerminationTransition {
+ public InternalRebootTransition(){
+ super(JobStateInternal.REBOOT, JobStateInternal.ERROR.toString());
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Mon Apr 8 16:12:32 2013
@@ -123,7 +123,7 @@ public class LocalContainerAllocator ext
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
- JobEventType.INTERNAL_ERROR));
+ JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Mon Apr 8 16:12:32 2013
@@ -574,7 +574,7 @@ public class RMContainerAllocator extend
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
- JobEventType.INTERNAL_ERROR));
+ JobEventType.JOB_AM_REBOOT));
throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Mon Apr 8 16:12:32 2013
@@ -33,7 +33,9 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@@ -45,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -86,9 +89,68 @@ import org.junit.Test;
attemptId.setApplicationId(appId);
JobId jobid = recordFactory.newRecordInstance(JobId.class);
jobid.setAppId(appId);
- MRAppMaster appMaster = new TestMRApp(attemptId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.RUNNING, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ appMaster.init(conf);
+ appMaster.start();
+ appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+ verify(fs).delete(stagingJobPath, true);
+ }
+
+ @Test (timeout = 30000)
+ public void testNoDeletionofStagingOnReboot() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(0);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.REBOOT, 4);
+ appMaster.init(conf);
+ appMaster.start();
+ //shutdown the job, not the lastRetry
+ appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(false, ((TestMRApp)appMaster).getTestIsLastAMRetry());
+ verify(fs, times(0)).delete(stagingJobPath, true);
+ }
+
+ @Test (timeout = 30000)
+ public void testDeletionofStagingOnReboot() throws IOException {
+ conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
+ fs = mock(FileSystem.class);
+ when(fs.delete(any(Path.class),anyBoolean())).thenReturn(true);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+ when(fs.exists(stagingDir)).thenReturn(true);
+ ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
+ ApplicationAttemptId.class);
+ attemptId.setAttemptId(1);
+ ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(0);
+ attemptId.setApplicationId(appId);
+ ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
+ MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+ JobStateInternal.REBOOT, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
appMaster.init(conf);
+ appMaster.start();
+ //shutdown the job, is lastRetry
appMaster.shutDownJob();
+ //test whether notifyIsLastAMRetry called
+ Assert.assertEquals(true, ((TestMRApp)appMaster).getTestIsLastAMRetry());
verify(fs).delete(stagingJobPath, true);
}
@@ -151,6 +213,8 @@ import org.junit.Test;
private class TestMRApp extends MRAppMaster {
ContainerAllocator allocator;
+ boolean testIsLastAMRetry = false;
+ JobStateInternal jobStateInternal;
public TestMRApp(ApplicationAttemptId applicationAttemptId,
ContainerAllocator allocator, int maxAppAttempts) {
@@ -160,9 +224,11 @@ import org.junit.Test;
this.allocator = allocator;
}
- public TestMRApp(ApplicationAttemptId applicationAttemptId) {
- this(applicationAttemptId, null,
- MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+ public TestMRApp(ApplicationAttemptId applicationAttemptId,
+ ContainerAllocator allocator, JobStateInternal jobStateInternal,
+ int maxAppAttempts) {
+ this(applicationAttemptId, allocator, maxAppAttempts);
+ this.jobStateInternal = jobStateInternal;
}
@Override
@@ -180,6 +246,31 @@ import org.junit.Test;
}
@Override
+ protected Job createJob(Configuration conf, JobStateInternal forcedState,
+ String diagnostic) {
+ JobImpl jobImpl = mock(JobImpl.class);
+ when(jobImpl.getInternalState()).thenReturn(this.jobStateInternal);
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ when(jobImpl.getID()).thenReturn(jobId);
+ ((AppContext) getContext())
+ .getAllJobs().put(jobImpl.getID(), jobImpl);
+ return jobImpl;
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ DefaultMetricsSystem.shutdown();
+ }
+
+ @Override
+ public void notifyIsLastAMRetry(boolean isLastAMRetry){
+ testIsLastAMRetry = isLastAMRetry;
+ super.notifyIsLastAMRetry(isLastAMRetry);
+ }
+
+ @Override
public RMHeartbeatHandler getRMHeartbeatHandler() {
return getStubbedHeartbeatHandler(getContext());
}
@@ -197,6 +288,9 @@ import org.junit.Test;
protected void downloadTokensAndSetupUGI(Configuration conf) {
}
+ public boolean getTestIsLastAMRetry(){
+ return testIsLastAMRetry;
+ }
}
private final class MRAppTestCleanup extends MRApp {
@@ -288,7 +382,7 @@ import org.junit.Test;
};
}
- @Test
+ @Test(timeout=20000)
public void testStagingCleanupOrder() throws Exception {
MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
this.getClass().getName(), true);
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1465674&r1=1465673&r2=1465674&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Apr 8 16:12:32 2013
@@ -195,6 +195,68 @@ public class TestJobImpl {
}
@Test(timeout=20000)
+ public void testRebootedDuringSetup() throws Exception{
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = new StubbedOutputCommitter() {
+ @Override
+ public synchronized void setupJob(JobContext jobContext)
+ throws IOException {
+ while(!Thread.interrupted()){
+ try{
+ wait();
+ }catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createStubbedJob(conf, dispatcher, 2);
+ JobId jobId = job.getID();
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ assertJobState(job, JobStateInternal.INITED);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+ assertJobState(job, JobStateInternal.SETUP);
+
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+ assertJobState(job, JobStateInternal.REBOOT);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ @Test(timeout=20000)
+ public void testRebootedDuringCommit() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ AsyncDispatcher dispatcher = new AsyncDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ CyclicBarrier syncBarrier = new CyclicBarrier(2);
+ OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+ completeJobTasks(job);
+ assertJobState(job, JobStateInternal.COMMITTING);
+
+ syncBarrier.await();
+ job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
+ assertJobState(job, JobStateInternal.REBOOT);
+ dispatcher.stop();
+ commitHandler.stop();
+ }
+
+ @Test(timeout=20000)
public void testKilledDuringSetup() throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);