You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/09/18 09:16:19 UTC
svn commit: r1172206 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/...
Author: vinodkv
Date: Sun Sep 18 07:16:18 2011
New Revision: 1172206
URL: http://svn.apache.org/viewvc?rev=1172206&view=rev
Log:
MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out history file. (vinodkv)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun Sep 18 07:16:18 2011
@@ -1339,6 +1339,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2987. Fixed display of logged user on RM Web-UI. (Thomas Graves
via acmurthy)
+ MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out
+ history file. (vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Sun Sep 18 07:16:18 2011
@@ -74,7 +74,7 @@ public class JobHistoryEventHandler exte
private BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>();
- private Thread eventHandlingThread;
+ protected Thread eventHandlingThread;
private volatile boolean stopped;
private final Object lock = new Object();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sun Sep 18 07:16:18 2011
@@ -56,12 +56,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
@@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.event.Asyn
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@@ -126,6 +129,7 @@ public class MRAppMaster extends Composi
private TaskAttemptListener taskAttemptListener;
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
+ private JobEventDispatcher jobEventDispatcher;
private Job job;
@@ -148,7 +152,7 @@ public class MRAppMaster extends Composi
@Override
public void init(final Configuration conf) {
- context = new RunningAppContext();
+ context = new RunningAppContext(conf);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
@@ -182,18 +186,17 @@ public class MRAppMaster extends Composi
//service to log job history events
EventHandler<JobHistoryEvent> historyService =
createJobHistoryHandler(context);
- addIfService(historyService);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ historyService);
- JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher();
+ this.jobEventDispatcher = new JobEventDispatcher();
//register the event dispatchers
- dispatcher.register(JobEventType.class, synchronousJobEventDispatcher);
+ dispatcher.register(JobEventType.class, jobEventDispatcher);
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
- dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
- historyService);
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
@@ -203,10 +206,34 @@ public class MRAppMaster extends Composi
}
dispatcher.register(Speculator.EventType.class,
- new SpeculatorEventDispatcher());
+ new SpeculatorEventDispatcher(conf));
- Credentials fsTokens = new Credentials();
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerAllocator = createContainerAllocator(clientService, context);
+ addIfService(containerAllocator);
+ dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
+
+ // corresponding service to launch allocated containers via NodeManager
+ containerLauncher = createContainerLauncher(context);
+ addIfService(containerLauncher);
+ dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
+
+ // Add the JobHistoryEventHandler last so that it is properly stopped first.
+ // This will guarantee that all history-events are flushed before AM goes
+ // ahead with shutdown.
+ // Note: Even though JobHistoryEventHandler is started last, if any
+ // component creates a JobHistoryEvent in the meanwhile, it will be just be
+ // queued inside the JobHistoryEventHandler
+ addIfService(historyService);
+ super.init(conf);
+ } // end of init()
+
+ /** Create and initialize (but don't start) a single job. */
+ protected Job createJob(Configuration conf) {
+
+ // ////////// Obtain the tokens needed by the job. //////////
+ Credentials fsTokens = new Credentials();
UserGroupInformation currentUser = null;
try {
@@ -234,66 +261,12 @@ public class MRAppMaster extends Composi
} catch (IOException e) {
throw new YarnException(e);
}
-
- super.init(conf);
-
- //---- start of what used to be startJobs() code:
-
- Configuration config = getConfig();
-
- job = createJob(config, fsTokens, currentUser.getUserName());
-
- /** create a job event for job intialization */
- JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
- /** send init to the job (this does NOT trigger job execution) */
- synchronousJobEventDispatcher.handle(initJobEvent);
-
- // send init to speculator. This won't yest start as dispatcher isn't
- // started yet.
- dispatcher.getEventHandler().handle(
- new SpeculatorEvent(job.getID(), clock.getTime()));
-
- // JobImpl's InitTransition is done (call above is synchronous), so the
- // "uber-decision" (MR-1220) has been made. Query job and switch to
- // ubermode if appropriate (by registering different container-allocator
- // and container-launcher services/event-handlers).
-
- if (job.isUber()) {
- LOG.info("MRAppMaster uberizing job " + job.getID()
- + " in local container (\"uber-AM\").");
- } else {
- LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
- + "job " + job.getID() + ".");
- }
-
- // service to allocate containers from RM (if non-uber) or to fake it (uber)
- containerAllocator =
- createContainerAllocator(clientService, context, job.isUber());
- addIfService(containerAllocator);
- dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
- if (containerAllocator instanceof Service) {
- ((Service) containerAllocator).init(config);
- }
-
- // corresponding service to launch allocated containers via NodeManager
- containerLauncher = createContainerLauncher(context, job.isUber());
- addIfService(containerLauncher);
- dispatcher.register(ContainerLauncher.EventType.class, containerLauncher);
- if (containerLauncher instanceof Service) {
- ((Service) containerLauncher).init(config);
- }
-
- } // end of init()
-
- /** Create and initialize (but don't start) a single job.
- * @param fsTokens */
- protected Job createJob(Configuration conf, Credentials fsTokens,
- String user) {
+ // ////////// End of obtaining the tokens needed by the job. //////////
// create single job
Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount,
- completedTasksFromPreviousRun, metrics, user);
+ completedTasksFromPreviousRun, metrics, currentUser.getUserName());
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
@@ -388,19 +361,13 @@ public class MRAppMaster extends Composi
}
protected ContainerAllocator createContainerAllocator(
- ClientService clientService, AppContext context, boolean isLocal) {
- //return new StaticContainerAllocator(context);
- return isLocal
- ? new LocalContainerAllocator(clientService, context)
- : new RMContainerAllocator(clientService, context);
+ final ClientService clientService, final AppContext context) {
+ return new ContainerAllocatorRouter(clientService, context);
}
- protected ContainerLauncher createContainerLauncher(AppContext context,
- boolean isLocal) {
- return isLocal
- ? new LocalContainerLauncher(context,
- (TaskUmbilicalProtocol) taskAttemptListener)
- : new ContainerLauncherImpl(context);
+ protected ContainerLauncher
+ createContainerLauncher(final AppContext context) {
+ return new ContainerLauncherRouter(context);
}
//TODO:should have an interface for MRClientService
@@ -440,9 +407,96 @@ public class MRAppMaster extends Composi
return taskAttemptListener;
}
- class RunningAppContext implements AppContext {
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class ContainerAllocatorRouter extends AbstractService
+ implements ContainerAllocator {
+ private final ClientService clientService;
+ private final AppContext context;
+ private ContainerAllocator containerAllocator;
+
+ ContainerAllocatorRouter(ClientService clientService,
+ AppContext context) {
+ super(ContainerAllocatorRouter.class.getName());
+ this.clientService = clientService;
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (job.isUber()) {
+ this.containerAllocator = new LocalContainerAllocator(
+ this.clientService, this.context);
+ } else {
+ this.containerAllocator = new RMContainerAllocator(
+ this.clientService, this.context);
+ }
+ ((Service)this.containerAllocator).init(getConfig());
+ ((Service)this.containerAllocator).start();
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ ((Service)this.containerAllocator).stop();
+ super.stop();
+ }
+
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ this.containerAllocator.handle(event);
+ }
+ }
+
+ /**
+ * By the time life-cycle of this router starts, job-init would have already
+ * happened.
+ */
+ private final class ContainerLauncherRouter extends AbstractService
+ implements ContainerLauncher {
+ private final AppContext context;
+ private ContainerLauncher containerLauncher;
+
+ ContainerLauncherRouter(AppContext context) {
+ super(ContainerLauncherRouter.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ public synchronized void start() {
+ if (job.isUber()) {
+ this.containerLauncher = new LocalContainerLauncher(context,
+ (TaskUmbilicalProtocol) taskAttemptListener);
+ } else {
+ this.containerLauncher = new ContainerLauncherImpl(context);
+ }
+ ((Service)this.containerLauncher).init(getConfig());
+ ((Service)this.containerLauncher).start();
+ super.start();
+ }
+
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ this.containerLauncher.handle(event);
+ }
+
+ @Override
+ public synchronized void stop() {
+ ((Service)this.containerLauncher).stop();
+ super.stop();
+ }
+ }
+
+ private class RunningAppContext implements AppContext {
- private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
+ private final Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
+ private final Configuration conf;
+
+ public RunningAppContext(Configuration config) {
+ this.conf = config;
+ }
@Override
public ApplicationAttemptId getApplicationAttemptId() {
@@ -481,7 +535,7 @@ public class MRAppMaster extends Composi
@Override
public CharSequence getUser() {
- return getConfig().get(MRJobConfig.USER_NAME);
+ return this.conf.get(MRJobConfig.USER_NAME);
}
@Override
@@ -492,13 +546,45 @@ public class MRAppMaster extends Composi
@Override
public void start() {
+
+ ///////////////////// Create the job itself.
+ job = createJob(getConfig());
+ // End of creating the job.
+
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
- startJobs();
+ /** create a job event for job intialization */
+ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+ /** send init to the job (this does NOT trigger job execution) */
+ // This is a synchronous call, not an event through dispatcher. We want
+ // job-init to be done completely here.
+ jobEventDispatcher.handle(initJobEvent);
+
+ // send init to speculator. This won't yest start as dispatcher isn't
+ // started yet.
+ dispatcher.getEventHandler().handle(
+ new SpeculatorEvent(job.getID(), clock.getTime()));
+
+ // JobImpl's InitTransition is done (call above is synchronous), so the
+ // "uber-decision" (MR-1220) has been made. Query job and switch to
+ // ubermode if appropriate (by registering different container-allocator
+ // and container-launcher services/event-handlers).
+
+ if (job.isUber()) {
+ LOG.info("MRAppMaster uberizing job " + job.getID()
+ + " in local container (\"uber-AM\").");
+ } else {
+ LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+ + "job " + job.getID() + ".");
+ }
+
//start all the components
super.start();
+
+ // All components have started, start the job.
+ startJobs();
}
/**
@@ -546,10 +632,14 @@ public class MRAppMaster extends Composi
private class SpeculatorEventDispatcher implements
EventHandler<SpeculatorEvent> {
+ private final Configuration conf;
+ public SpeculatorEventDispatcher(Configuration config) {
+ this.conf = config;
+ }
@Override
public void handle(SpeculatorEvent event) {
- if (getConfig().getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
- || getConfig().getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+ if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+ || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sun Sep 18 07:16:18 2011
@@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -234,11 +235,16 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected Job createJob(Configuration conf, Credentials fsTokens,
- String user) {
- Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(),
+ protected Job createJob(Configuration conf) {
+ UserGroupInformation currentUser = null;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
getTaskAttemptListener(), getContext().getClock(),
- user);
+ currentUser.getUserName());
((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
getDispatcher().register(JobFinishEvent.Type.class,
@@ -279,8 +285,7 @@ public class MRApp extends MRAppMaster {
}
@Override
- protected ContainerLauncher createContainerLauncher(AppContext context,
- boolean isLocal) {
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
return new MockContainerLauncher();
}
@@ -317,7 +322,7 @@ public class MRApp extends MRAppMaster {
@Override
protected ContainerAllocator createContainerAllocator(
- ClientService clientService, AppContext context, boolean isLocal) {
+ ClientService clientService, AppContext context) {
return new ContainerAllocator(){
private int containerCount;
@Override
@@ -369,12 +374,14 @@ public class MRApp extends MRAppMaster {
class TestJob extends JobImpl {
//override the init transition
+ private final TestInitTransition initTransition = new TestInitTransition(
+ maps, reduces);
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
= stateMachineFactory.addTransition(JobState.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED),
JobEventType.JOB_INIT,
// This is abusive.
- new TestInitTransition(getConfig(), maps, reduces));
+ initTransition);
private final StateMachine<JobState, JobEventType, JobEvent>
localStateMachine;
@@ -384,10 +391,10 @@ public class MRApp extends MRAppMaster {
return localStateMachine;
}
- public TestJob(ApplicationId appID, EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener, Clock clock,
- String user) {
- super(appID, new Configuration(), eventHandler, taskAttemptListener,
+ public TestJob(Configuration conf, ApplicationId appID,
+ EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
+ Clock clock, String user) {
+ super(appID, conf, eventHandler, taskAttemptListener,
new JobTokenSecretManager(), new Credentials(), clock, getStartCount(),
getCompletedTaskFromPreviousRun(), metrics, user);
@@ -399,17 +406,14 @@ public class MRApp extends MRAppMaster {
//Override InitTransition to not look for split files etc
static class TestInitTransition extends JobImpl.InitTransition {
- private Configuration config;
private int maps;
private int reduces;
- TestInitTransition(Configuration config, int maps, int reduces) {
- this.config = config;
+ TestInitTransition(int maps, int reduces) {
this.maps = maps;
this.reduces = reduces;
}
@Override
protected void setup(JobImpl job) throws IOException {
- job.conf = config;
job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
job.remoteJobConfFile = new Path("test");
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Sun Sep 18 07:16:18 2011
@@ -94,7 +94,7 @@ public class MRAppBenchmark {
@Override
protected ContainerAllocator createContainerAllocator(
- ClientService clientService, AppContext context, boolean isLocal) {
+ ClientService clientService, AppContext context) {
return new ThrottledContainerAllocator();
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Sun Sep 18 07:16:18 2011
@@ -169,7 +169,7 @@ public class TestFail {
@Test
public void testTaskFailWithUnusedContainer() throws Exception {
- MRApp app = new FailingTaskWithUnusedContainer();
+ MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
Configuration conf = new Configuration();
int maxAttempts = 1;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
@@ -194,21 +194,21 @@ public class TestFail {
app.waitForState(job, JobState.FAILED);
}
- static class FailingTaskWithUnusedContainer extends MRApp {
+ static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
- public FailingTaskWithUnusedContainer() {
+ public MRAppWithFailingTaskAndUnusedContainer() {
super(1, 0, false, "TaskFailWithUnsedContainer", true);
}
- protected ContainerLauncher createContainerLauncher(AppContext context,
- boolean isLocal) {
+ @Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
return new ContainerLauncherImpl(context) {
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
- super.handle(event);
+ super.handle(event); // Unused event and container.
break;
case CONTAINER_REMOTE_CLEANUP:
getContext().getEventHandler().handle(
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Sun Sep 18 07:16:18 2011
@@ -24,10 +24,10 @@ import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -195,6 +195,7 @@ public class TestMRApp {
public static void main(String[] args) throws Exception {
TestMRApp t = new TestMRApp();
t.testMapReduce();
+ t.testZeroMapReduces();
t.testCommitPending();
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1172206&r1=1172205&r2=1172206&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Sun Sep 18 07:16:18 2011
@@ -92,6 +92,60 @@ public class TestJobHistoryEvents {
parsedJob.getState());
}
+ /**
+ * Verify that all the events are flushed on stopping the HistoryHandler
+ * @throws Exception
+ */
+ @Test
+ public void testEventsFlushOnStop() throws Exception {
+
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.USER_NAME, "test");
+ MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
+ .getClass().getName(), true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+ LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ // make sure all events are flushed
+ app.waitForState(Service.STATE.STOPPED);
+ /*
+ * Use HistoryContext to read logged events and verify the number of
+ * completed maps
+ */
+ HistoryContext context = new JobHistory();
+ ((JobHistory) context).init(conf);
+ Job parsedJob = context.getJob(jobId);
+ Assert.assertEquals("CompletedMaps not correct", 1, parsedJob
+ .getCompletedMaps());
+
+ Map<TaskId, Task> tasks = parsedJob.getTasks();
+ Assert.assertEquals("No of tasks not correct", 1, tasks.size());
+ verifyTask(tasks.values().iterator().next());
+
+ Map<TaskId, Task> maps = parsedJob.getTasks(TaskType.MAP);
+ Assert.assertEquals("No of maps not correct", 1, maps.size());
+
+ Assert.assertEquals("Job state not currect", JobState.SUCCEEDED,
+ parsedJob.getState());
+ }
+
+ @Test
+ public void testJobHistoryEventHandlerIsFirstServiceToStop() {
+ MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
+ .getClass().getName(), true);
+ Configuration conf = new Configuration();
+ app.init(conf);
+ Service[] services = app.getServices().toArray(new Service[0]);
+ // Verifying that it is the last to be added is same as verifying that it is
+ // the first to be stopped. CompositeService related tests already validate
+ // this.
+ Assert.assertEquals("JobHistoryEventHandler",
+ services[services.length - 1].getName());
+ }
+
private void verifyTask(Task task) {
Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED,
task.getState());
@@ -116,14 +170,43 @@ public class TestJobHistoryEvents {
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
- getStartCount());
+ JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
+ context, getStartCount());
return eventHandler;
}
}
-
+
+ /**
+ * MRapp with special HistoryEventHandler that writes events only during stop.
+ * This is to simulate events that don't get written by the eventHandling
+ * thread due to say a slow DFS and verify that they are flushed during stop.
+ */
+ private static class MRAppWithSpecialHistoryHandler extends MRApp {
+
+ public MRAppWithSpecialHistoryHandler(int maps, int reduces,
+ boolean autoComplete, String testName, boolean cleanOnStart) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
+ }
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new JobHistoryEventHandler(context, getStartCount()) {
+ @Override
+ public void start() {
+ // Don't start any event draining thread.
+ super.eventHandlingThread = new Thread();
+ super.eventHandlingThread.start();
+ }
+ };
+ }
+
+ }
+
public static void main(String[] args) throws Exception {
TestJobHistoryEvents t = new TestJobHistoryEvents();
t.testHistoryEvents();
+ t.testEventsFlushOnStop();
+ t.testJobHistoryEventHandlerIsFirstServiceToStop();
}
}