You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [8/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,341 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+
+/**
+ * Mock MRAppMaster. Doesn't start RPC servers.
+ * No threads are started except of the event Dispatcher thread.
+ */
+public class MRApp extends MRAppMaster {
+
+ int maps;
+ int reduces;
+
+ //if true tasks complete automatically as soon as they are launched
+ protected boolean autoComplete = false;
+
+ public MRApp(int maps, int reduces, boolean autoComplete) {
+ super(new ApplicationID());
+ this.maps = maps;
+ this.reduces = reduces;
+ this.autoComplete = autoComplete;
+ }
+
+ public Job submit(Configuration conf) throws Exception {
+ String user = conf.get(MRJobConfig.USER_NAME, "mapred");
+ conf.set(MRJobConfig.USER_NAME, user);
+ init(conf);
+ start();
+ Job job = getContext().getAllJobs().values().iterator().next();
+ return job;
+ }
+
+ public void waitForState(TaskAttempt attempt,
+ TaskAttemptState finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskAttemptReport report = attempt.getReport();
+ while (!finalState.equals(report.state) &&
+ timeoutSecs++ < 20) {
+ System.out.println("TaskAttempt State is : " + report.state +
+ " Waiting for state : " + finalState +
+ " progress : " + report.progress);
+ report = attempt.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("TaskAttempt State is : " + report.state);
+ Assert.assertEquals("TaskAttempt state is not correct (timedout)",
+ finalState,
+ report.state);
+ }
+
+ public void waitForState(Task task, TaskState finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskReport report = task.getReport();
+ while (!finalState.equals(report.state) &&
+ timeoutSecs++ < 20) {
+ System.out.println("Task State is : " + report.state +
+ " Waiting for state : " + finalState +
+ " progress : " + report.progress);
+ report = task.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("Task State is : " + report.state);
+ Assert.assertEquals("Task state is not correct (timedout)", finalState,
+ report.state);
+ }
+
+ public void waitForState(Job job, JobState finalState) throws Exception {
+ int timeoutSecs = 0;
+ JobReport report = job.getReport();
+ while (!finalState.equals(report.state) &&
+ timeoutSecs++ < 20) {
+ System.out.println("Job State is : " + report.state +
+ " Waiting for state : " + finalState +
+ " map progress : " + report.mapProgress +
+ " reduce progress : " + report.reduceProgress);
+ report = job.getReport();
+ Thread.sleep(500);
+ }
+ System.out.println("Job State is : " + report.state);
+ Assert.assertEquals("Job state is not correct (timedout)", finalState,
+ job.getState());
+ }
+
+ public void verifyCompleted() {
+ for (Job job : getContext().getAllJobs().values()) {
+ JobReport jobReport = job.getReport();
+ Assert.assertTrue("Job start time is not less than finish time",
+ jobReport.startTime < jobReport.finishTime);
+ Assert.assertTrue("Job finish time is in future",
+ jobReport.finishTime < System.currentTimeMillis());
+ for (Task task : job.getTasks().values()) {
+ TaskReport taskReport = task.getReport();
+ Assert.assertTrue("Task start time is not less than finish time",
+ taskReport.startTime < taskReport.finishTime);
+ for (TaskAttempt attempt : task.getAttempts().values()) {
+ TaskAttemptReport attemptReport = attempt.getReport();
+ Assert.assertTrue("Attempt start time is not less than finish time",
+ attemptReport.startTime < attemptReport.finishTime);
+ }
+ }
+ }
+ }
+
+ protected void startJobs() {
+ Job job = new TestJob(getAppID(), getDispatcher().getEventHandler(),
+ getTaskAttemptListener());
+ ((AppContext) getContext()).getAllJobs().put(job.getID(), job);
+
+ getDispatcher().register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ createJobHistoryHandler(getConfig()));
+ getDispatcher().register(JobFinishEvent.Type.class,
+ new EventHandler<JobFinishEvent>() {
+ @Override
+ public void handle(JobFinishEvent event) {
+ stop();
+ }
+ });
+
+ /** create a job event for job intialization **/
+ JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+ /** send init on the job. this triggers the job execution.**/
+ getDispatcher().getEventHandler().handle(initJobEvent);
+ }
+
+ @Override
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ return new TaskAttemptListener(){
+ @Override
+ public InetSocketAddress getAddress() {
+ return null;
+ }
+ @Override
+ public void register(TaskAttemptID attemptID,
+ org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
+ @Override
+ public void unregister(TaskAttemptID attemptID, WrappedJvmID jvmID) {
+ }
+ };
+ }
+
+ @Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ return new MockContainerLauncher();
+ }
+
+ class MockContainerLauncher implements ContainerLauncher {
+ @Override
+ public void handle(ContainerLauncherEvent event) {
+ switch (event.getType()) {
+ case CONTAINER_REMOTE_LAUNCH:
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(event.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+
+ attemptLaunched(event.getTaskAttemptID());
+ break;
+ case CONTAINER_REMOTE_CLEANUP:
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(event.getTaskAttemptID(),
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ break;
+ }
+ }
+ }
+
+ protected void attemptLaunched(TaskAttemptID attemptID) {
+ if (autoComplete) {
+ // send the done event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new ContainerAllocator(){
+ private int containerCount;
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ ContainerID cId = new ContainerID();
+ cId.appID = getContext().getApplicationID();
+ cId.id = containerCount++;
+ getContext().getEventHandler().handle(
+ new TaskAttemptContainerAssignedEvent(event.getAttemptID(), cId,
+ "dummy", null));
+ }
+ };
+ }
+
+ @Override
+ protected TaskCleaner createTaskCleaner(AppContext context) {
+ return new TaskCleaner() {
+ @Override
+ public void handle(TaskCleanupEvent event) {
+ //send the cleanup done event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(event.getAttemptID(),
+ TaskAttemptEventType.TA_CLEANUP_DONE));
+ }
+ };
+ }
+
+ @Override
+ protected ClientService createClientService(AppContext context) {
+ return new ClientService(){
+ @Override
+ public InetSocketAddress getBindAddress() {
+ return null;
+ }
+
+ @Override
+ public int getHttpPort() {
+ return -1;
+ }
+ };
+ }
+
+ class TestJob extends JobImpl {
+ //overwrite the init transition
+ StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
+ //overwrite the init transition
+ = stateMachineFactory.addTransition
+ (JobState.NEW,
+ EnumSet.of(JobState.RUNNING, JobState.FAILED),
+ JobEventType.JOB_INIT,
+ // This is abusive.
+ new TestInitTransition(getConfig(), maps, reduces));
+
+ private final StateMachine<JobState, JobEventType, JobEvent>
+ localStateMachine;
+
+ @Override
+ protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+ return localStateMachine;
+ }
+
+ public TestJob(ApplicationID appID, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener) {
+ super(appID, new Configuration(), eventHandler, taskAttemptListener,
+ new JobTokenSecretManager(), new Credentials());
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ localStateMachine = localFactory.make(this);
+ }
+
+ }
+
+ //Override InitTransition to not look for split files etc
+ static class TestInitTransition extends JobImpl.InitTransition {
+ private Configuration config;
+ private int maps;
+ private int reduces;
+ TestInitTransition(Configuration config, int maps, int reduces) {
+ this.config = config;
+ this.maps = maps;
+ this.reduces = reduces;
+ }
+ @Override
+ protected void setup(JobImpl job) throws IOException {
+ job.conf = config;
+ job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
+ job.remoteJobConfFile = new Path("test");
+ }
+ @Override
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobID jobId) {
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
+ for (int i = 0; i < maps ; i++) {
+ splits[i] = new TaskSplitMetaInfo();
+ }
+ return splits;
+ }
+ }
+
+}
+
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class MRAppBenchmark {
+
+ /**
+ * Runs memory and time benchmark with Mock MRApp.
+ */
+ public void run(MRApp app) throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.WARN);
+ long startTime = System.currentTimeMillis();
+ Job job = app.submit(new Configuration());
+ while (!job.getReport().state.equals(JobState.SUCCEEDED)) {
+ printStat(job, startTime);
+ Thread.sleep(2000);
+ }
+ printStat(job, startTime);
+ }
+
+ private void printStat(Job job, long startTime) throws Exception {
+ long currentTime = System.currentTimeMillis();
+ Runtime.getRuntime().gc();
+ long mem = Runtime.getRuntime().totalMemory()
+ - Runtime.getRuntime().freeMemory();
+ System.out.println("JobState:" + job.getState() +
+ " CompletedMaps:" + job.getCompletedMaps() +
+ " CompletedReduces:" + job.getCompletedReduces() +
+ " Memory(total-free)(KB):" + mem/1024 +
+ " ElapsedTime(ms):" + (currentTime - startTime));
+ }
+
+ //Throttles the maximum number of concurrent running tasks.
+ //This affects the memory requirement since
+ //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+ //running task and discarded once the task is launched.
+ static class ThrottledMRApp extends MRApp {
+
+ int maxConcurrentRunningTasks;
+ volatile int concurrentRunningTasks;
+ ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+ super(maps, reduces, true);
+ this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptID attemptID) {
+ super.attemptLaunched(attemptID);
+ //the task is launched and sends done immediately
+ concurrentRunningTasks--;
+ }
+
+ @Override
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new ThrottledContainerAllocator();
+ }
+
+ class ThrottledContainerAllocator extends AbstractService
+ implements ContainerAllocator {
+ private int containerCount;
+ private Thread thread;
+ private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+ new LinkedBlockingQueue<ContainerAllocatorEvent>();
+ public ThrottledContainerAllocator() {
+ super("ThrottledContainerAllocator");
+ }
+ @Override
+ public void handle(ContainerAllocatorEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+ @Override
+ public void start() {
+ thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ ContainerAllocatorEvent event = null;
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+ event = eventQueue.take();
+ ContainerID cId = new ContainerID();
+ cId.appID = getContext().getApplicationID();
+ cId.id = containerCount++;
+ //System.out.println("Allocating " + containerCount);
+ getContext().getEventHandler().handle(
+ new TaskAttemptContainerAssignedEvent(event
+ .getAttemptID(), cId, "dummy", null));
+ concurrentRunningTasks++;
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Returning, interrupted");
+ return;
+ }
+ }
+ }
+ });
+ thread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ thread.interrupt();
+ super.stop();
+ }
+ }
+ }
+
+ public void benchmark1() throws Exception {
+ int maps = 900;
+ int reduces = 100;
+ System.out.println("Running benchmark with maps:"+maps +
+ " reduces:"+reduces);
+ run(new MRApp(maps, reduces, true));
+ }
+
+ public void benchmark2() throws Exception {
+ int maps = 4000;
+ int reduces = 1000;
+ int maxConcurrentRunningTasks = 500;
+
+ System.out.println("Running benchmark with throttled running tasks with " +
+ "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+ " maps:" + maps + " reduces:" + reduces);
+ run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+ }
+
+ public static void main(String[] args) throws Exception {
+ MRAppBenchmark benchmark = new MRAppBenchmark();
+ benchmark.benchmark1();
+ benchmark.benchmark2();
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,438 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class MockJobs extends MockApps {
+ static final Iterator<JobState> JOB_STATES = Iterators.cycle(
+ JobState.values());
+ static final Iterator<TaskState> TASK_STATES = Iterators.cycle(
+ TaskState.values());
+ static final Iterator<TaskAttemptState> TASK_ATTEMPT_STATES = Iterators.cycle(
+ TaskAttemptState.values());
+ static final Iterator<TaskType> TASK_TYPES = Iterators.cycle(
+ TaskType.values());
+ static final Iterator<JobCounter> JOB_COUNTERS = Iterators.cycle(
+ JobCounter.values());
+ static final Iterator<FileSystemCounter> FS_COUNTERS = Iterators.cycle(
+ FileSystemCounter.values());
+ static final Iterator<TaskCounter> TASK_COUNTERS = Iterators.cycle(
+ TaskCounter.values());
+ static final Iterator<String> FS_SCHEMES = Iterators.cycle("FILE", "HDFS",
+ "LAFS", "CEPH");
+ static final Iterator<String> USER_COUNTER_GROUPS = Iterators.cycle(
+ "com.company.project.subproject.component.subcomponent.UserDefinedSpecificSpecialTask$Counters",
+ "PigCounters");
+ static final Iterator<String> USER_COUNTERS = Iterators.cycle(
+ "counter1", "counter2", "counter3");
+ static final Iterator<Phase> PHASES = Iterators.cycle(Phase.values());
+ static final Iterator<String> DIAGS = Iterators.cycle(
+ "Error: java.lang.OutOfMemoryError: Java heap space",
+ "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+ public static String newJobName() {
+ return newAppName();
+ }
+
+ public static Map<JobID, Job> newJobs(ApplicationID appID, int numJobsPerApp,
+ int numTasksPerJob,
+ int numAttemptsPerTask) {
+ Map<JobID, Job> map = Maps.newHashMap();
+ for (int j = 0; j < numJobsPerApp; ++j) {
+ Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+ map.put(job.getID(), job);
+ }
+ return map;
+ }
+
+ public static JobID newJobID(ApplicationID appID, int i) {
+ JobID id = new JobID();
+ id.appID = appID;
+ id.id = i;
+ return id;
+ }
+
+ public static JobReport newJobReport(JobID id) {
+ JobReport report = new JobReport();
+ report.id = id;
+ report.startTime = System.currentTimeMillis()
+ - (int)(Math.random() * 1000000);
+ report.finishTime = System.currentTimeMillis()
+ + (int)(Math.random() * 1000000) + 1;
+ report.mapProgress = (float)Math.random();
+ report.reduceProgress = (float)Math.random();
+ report.state = JOB_STATES.next();
+ return report;
+ }
+
+ public static TaskReport newTaskReport(TaskID id) {
+ TaskReport report = new TaskReport();
+ report.id = id;
+ report.startTime = System.currentTimeMillis()
+ - (int)(Math.random() * 1000000);
+ report.finishTime = System.currentTimeMillis()
+ + (int)(Math.random() * 1000000) + 1;
+ report.progress = (float)Math.random();
+ report.counters = newCounters();
+ report.state = TASK_STATES.next();
+ return report;
+ }
+
+ public static TaskAttemptReport newTaskAttemptReport(TaskAttemptID id) {
+ TaskAttemptReport report = new TaskAttemptReport();
+ report.id = id;
+ report.startTime = System.currentTimeMillis()
+ - (int)(Math.random() * 1000000);
+ report.finishTime = System.currentTimeMillis()
+ + (int)(Math.random() * 1000000) + 1;
+ report.phase = PHASES.next();
+ report.state = TASK_ATTEMPT_STATES.next();
+ report.progress = (float)Math.random();
+ report.counters = newCounters();
+ return report;
+ }
+
+ @SuppressWarnings("deprecation")
+ public static Counters newCounters() {
+ org.apache.hadoop.mapred.Counters hc =
+ new org.apache.hadoop.mapred.Counters();
+ for (JobCounter c : JobCounter.values()) {
+ hc.findCounter(c).setValue((long)(Math.random() * 1000));
+ }
+ for (TaskCounter c : TaskCounter.values()) {
+ hc.findCounter(c).setValue((long)(Math.random() * 1000));
+ }
+ int nc = FileSystemCounter.values().length * 4;
+ for (int i = 0; i < nc; ++i) {
+ for (FileSystemCounter c : FileSystemCounter.values()) {
+ hc.findCounter(FS_SCHEMES.next(), c).
+ setValue((long)(Math.random() * 1000000));
+ }
+ }
+ for (int i = 0; i < 2 * 3; ++i) {
+ hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next()).
+ setValue((long)(Math.random() * 100000));
+ }
+ return TypeConverter.toYarn(hc);
+ }
+
+ public static Map<TaskAttemptID, TaskAttempt> newTaskAttempts(TaskID tid,
+ int m) {
+ Map<TaskAttemptID, TaskAttempt> map = Maps.newHashMap();
+ for (int i = 0; i < m; ++i) {
+ TaskAttempt ta = newTaskAttempt(tid, i);
+ map.put(ta.getID(), ta);
+ }
+ return map;
+ }
+
+ public static TaskAttempt newTaskAttempt(TaskID tid, int i) {
+ final TaskAttemptID taid = new TaskAttemptID();
+ taid.taskID = tid;
+ taid.id = i;
+ final TaskAttemptReport report = newTaskAttemptReport(taid);
+ final List<CharSequence> diags = Lists.newArrayList();
+ diags.add(DIAGS.next());
+ return new TaskAttempt() {
+ @Override
+ public TaskAttemptID getID() {
+ return taid;
+ }
+
+ @Override
+ public TaskAttemptReport getReport() {
+ return report;
+ }
+
+ @Override
+ public long getLaunchTime() {
+ return 0;
+ }
+
+ @Override
+ public long getFinishTime() {
+ return 0;
+ }
+
+ @Override
+ public Counters getCounters() {
+ return report.counters;
+ }
+
+ @Override
+ public float getProgress() {
+ return report.progress;
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ return report.state;
+ }
+
+ @Override
+ public boolean isFinished() {
+ switch (report.state) {
+ case SUCCEEDED:
+ case FAILED:
+ case KILLED: return true;
+ }
+ return false;
+ }
+
+ @Override
+ public ContainerID getAssignedContainerID() {
+ ContainerID id = new ContainerID();
+ id.appID = taid.taskID.jobID.appID;
+ return id;
+ }
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ return "localhost";
+ }
+
+ @Override
+ public List<CharSequence> getDiagnostics() {
+ return diags;
+ }
+ };
+ }
+
+ public static Map<TaskID, Task> newTasks(JobID jid, int n, int m) {
+ Map<TaskID, Task> map = Maps.newHashMap();
+ for (int i = 0; i < n; ++i) {
+ Task task = newTask(jid, i, m);
+ map.put(task.getID(), task);
+ }
+ return map;
+ }
+
+ public static Task newTask(JobID jid, int i, int m) {
+ final TaskID tid = new TaskID();
+ tid.jobID = jid;
+ tid.id = i;
+ tid.taskType = TASK_TYPES.next();
+ final TaskReport report = newTaskReport(tid);
+ final Map<TaskAttemptID, TaskAttempt> attempts = newTaskAttempts(tid, m);
+ return new Task() {
+ @Override
+ public TaskID getID() {
+ return tid;
+ }
+
+ @Override
+ public TaskReport getReport() {
+ return report;
+ }
+
+ @Override
+ public Counters getCounters() {
+ return report.counters;
+ }
+
+ @Override
+ public float getProgress() {
+ return report.progress;
+ }
+
+ @Override
+ public TaskType getType() {
+ return tid.taskType;
+ }
+
+ @Override
+ public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+ return attempts;
+ }
+
+ @Override
+ public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+ return attempts.get(attemptID);
+ }
+
+ @Override
+ public boolean isFinished() {
+ switch (report.state) {
+ case SUCCEEDED:
+ case KILLED:
+ case FAILED: return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean canCommit(TaskAttemptID taskAttemptID) {
+ return false;
+ }
+
+ @Override
+ public TaskState getState() {
+ return report.state;
+ }
+ };
+ }
+
+ public static Counters getCounters(Collection<Task> tasks) {
+ Counters counters = JobImpl.newCounters();
+ return JobImpl.incrTaskCounters(counters, tasks);
+ }
+
+ static class TaskCount {
+ int maps;
+ int reduces;
+ int completedMaps;
+ int completedReduces;
+
+ void incr(Task task) {
+ TaskType type = task.getType();
+ boolean finished = task.isFinished();
+ if (type == TaskType.MAP) {
+ if (finished) {
+ ++completedMaps;
+ }
+ ++maps;
+ } else if (type == TaskType.REDUCE) {
+ if (finished) {
+ ++completedReduces;
+ }
+ ++reduces;
+ }
+ }
+ }
+
+ static TaskCount getTaskCount(Collection<Task> tasks) {
+ TaskCount tc = new TaskCount();
+ for (Task task : tasks) {
+ tc.incr(task);
+ }
+ return tc;
+ }
+
+ public static Job newJob(ApplicationID appID, int i, int n, int m) {
+ final JobID id = newJobID(appID, i);
+ final String name = newJobName();
+ final JobReport report = newJobReport(id);
+ final Map<TaskID, Task> tasks = newTasks(id, n, m);
+ final TaskCount taskCount = getTaskCount(tasks.values());
+ final Counters counters = getCounters(tasks.values());
+ return new Job() {
+ @Override
+ public JobID getID() {
+ return id;
+ }
+
+ @Override
+ public CharSequence getName() {
+ return name;
+ }
+
+ @Override
+ public JobState getState() {
+ return report.state;
+ }
+
+ @Override
+ public JobReport getReport() {
+ return report;
+ }
+
+ @Override
+ public Counters getCounters() {
+ return counters;
+ }
+
+ @Override
+ public Map<TaskID, Task> getTasks() {
+ return tasks;
+ }
+
+ @Override
+ public Task getTask(TaskID taskID) {
+ return tasks.get(taskID);
+ }
+
+ @Override
+ public int getTotalMaps() {
+ return taskCount.maps;
+ }
+
+ @Override
+ public int getTotalReduces() {
+ return taskCount.reduces;
+ }
+
+ @Override
+ public int getCompletedMaps() {
+ return taskCount.completedMaps;
+ }
+
+ @Override
+ public int getCompletedReduces() {
+ return taskCount.completedReduces;
+ }
+
+ @Override
+ public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId,
+ int maxEvents) {
+ return null;
+ }
+
+ @Override
+ public Map<TaskID, Task> getTasks(TaskType taskType) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ };
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,90 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class SpeculationSleepJob extends SleepJob {
+ public Job createJob(int numMapper, int numReducer,
+ long mapSleepTime, int mapSleepCount,
+ long reduceSleepTime, int reduceSleepCount)
+ throws IOException {
+ Job result = super.createJob
+ (numMapper, numReducer, mapSleepTime,
+ mapSleepCount, reduceSleepTime, reduceSleepCount);
+
+ result.setMapperClass(SpeculationSleepMapper.class);
+
+ return result;
+ }
+
+ // This is a new class rather than a subclass of SleepJob.SleepMapper
+ // because SleepMapper has private [rather than protected] fields
+ class SpeculationSleepMapper
+ extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ private long mapSleepDuration = 100;
+ private int mapSleepCount = 1;
+ private int count = 0;
+
+ private final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.mapSleepCount =
+ conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+ this.mapSleepDuration =
+ conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+ final String attemptID = conf.get(TASK_ATTEMPT_ID);
+ if (attemptID.endsWith("0000_0")) {
+ mapSleepDuration *= 20L;
+ }
+ }
+
+ // we need to do this instead of
+ @Override
+ public void map(IntWritable key, IntWritable value, Context context
+ ) throws IOException, InterruptedException {
+ //it is expected that every map processes mapSleepCount number of records.
+ try {
+ context.setStatus("Sleeping... (" +
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+ Thread.sleep(mapSleepDuration);
+ }
+ catch (InterruptedException ex) {
+ throw (IOException)new IOException("Interrupted while sleeping", ex);
+ }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ context.write(new IntWritable(k + i), NullWritable.get());
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,235 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt failure
+ * scenarios.
+ */
+public class TestFail {
+
+ @Test
+ //First attempt is failed and second attempt is passed
+ //The job succeeds.
+ public void testFailTask() throws Exception {
+ MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
+ Job job = app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ Map<TaskID,Task> tasks = job.getTasks();
+ Assert.assertEquals("No of tasks is not correct", 1,
+ tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+ task.getReport().state);
+ Map<TaskAttemptID, TaskAttempt> attempts =
+ tasks.values().iterator().next().getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 2,
+ attempts.size());
+ //one attempt must be failed
+ //and another must have succeeded
+ Iterator<TaskAttempt> it = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ it.next().getReport().state);
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+ it.next().getReport().state);
+ }
+
+ @Test
+ public void testMapFailureMaxPercent() throws Exception {
+ MRApp app = new MockFirstFailingTaskMRApp(4, 0);
+ Configuration conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+
+ //setting the failure percentage to 25% (1/4 is 25) will
+ //make the Job successful
+ app = new MockFirstFailingTaskMRApp(4, 0);
+ conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ public void testReduceFailureMaxPercent() throws Exception {
+ MRApp app = new MockFirstFailingTaskMRApp(2, 4);
+ Configuration conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+
+ //setting the failure percentage to 25% (1/4 is 25) will
+ //make the Job successful
+ app = new MockFirstFailingTaskMRApp(2, 4);
+ conf = new Configuration();
+
+ //reduce the no of attempts so test run faster
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+
+ conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
+ conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ //All Task attempts are timed out, leading to Job failure
+ public void testTimedOutTask() throws Exception {
+ MRApp app = new TimeOutTaskMRApp(1, 0);
+ Configuration conf = new Configuration();
+ int maxAttempts = 2;
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+ Map<TaskID,Task> tasks = job.getTasks();
+ Assert.assertEquals("No of tasks is not correct", 1,
+ tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.FAILED,
+ task.getReport().state);
+ Map<TaskAttemptID, TaskAttempt> attempts =
+ tasks.values().iterator().next().getAttempts();
+ Assert.assertEquals("No of attempts is not correct", maxAttempts,
+ attempts.size());
+ for (TaskAttempt attempt : attempts.values()) {
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ attempt.getReport().state);
+ }
+ }
+
+ static class TimeOutTaskMRApp extends MRApp {
+ TimeOutTaskMRApp(int maps, int reduces) {
+ super(maps, reduces, false);
+ }
+ @Override
+ protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+ //This will create the TaskAttemptListener with TaskHeartbeatHandler
+ //RPC servers are not started
+ //task time out is reduced
+ //when attempt times out, heartbeat handler will send the lost event
+ //leading to Attempt failure
+ return new TaskAttemptListenerImpl(getContext(), null) {
+ public void startRpcServer(){};
+ public void stopRpcServer(){};
+ public void init(Configuration conf) {
+ conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
+ super.init(conf);
+ }
+ };
+ }
+ }
+
+ //Attempts of first Task are failed
+ static class MockFirstFailingTaskMRApp extends MRApp {
+
+ MockFirstFailingTaskMRApp(int maps, int reduces) {
+ super(maps, reduces, true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptID attemptID) {
+ if (attemptID.taskID.id == 0) {//check if it is first task
+ // send the Fail event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_FAILMSG));
+ } else {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+ }
+
+ //First attempt is failed
+ static class MockFirstFailingAttemptMRApp extends MRApp {
+ MockFirstFailingAttemptMRApp(int maps, int reduces) {
+ super(maps, reduces, true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptID attemptID) {
+ if (attemptID.taskID.id == 0 && attemptID.id == 0) {
+ //check if it is first task's first attempt
+ // send the Fail event
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_FAILMSG));
+ } else {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestFail t = new TestFail();
+ t.testFailTask();
+ t.testTimedOutTask();
+ t.testMapFailureMaxPercent();
+ t.testReduceFailureMaxPercent();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+public class TestFetchFailure {
+
+ @Test
+ public void testFetchFailure() throws Exception {
+ MRApp app = new MRApp(1, 1, false);
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 2, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+
+ //wait for Task state move to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ TaskAttemptCompletionEvent[] events =
+ job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("No of completion events not correct",
+ 1, events.length);
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].status);
+
+ // wait for reduce to start running
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ TaskAttempt reduceAttempt =
+ reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ //send 3 fetch failures from reduce to trigger map re execution
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+ //wait for map Task state move back to RUNNING
+ app.waitForState(mapTask, TaskState.RUNNING);
+
+ //map attempt must have become FAILED
+ Assert.assertEquals("Map TaskAttempt state not correct",
+ TaskAttemptState.FAILED, mapAttempt1.getState());
+
+ Assert.assertEquals("No of attempts in Map Task not correct",
+ 2, mapTask.getAttempts().size());
+
+ Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+ atIt.next();
+ TaskAttempt mapAttempt2 = atIt.next();
+
+ app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+ //send the done signal to the second map attempt
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(mapAttempt2.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ // wait for map success
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ //send done to reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(reduceAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ //previous completion event now becomes obsolete
+ Assert.assertEquals("Event status not correct",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].status);
+
+ events = job.getTaskAttemptCompletionEvents(0, 100);
+ Assert.assertEquals("No of completion events not correct",
+ 4, events.length);
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[0].attemptId);
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt1.getID(), events[1].attemptId);
+ Assert.assertEquals("Event map attempt id not correct",
+ mapAttempt2.getID(), events[2].attemptId);
+ Assert.assertEquals("Event redude attempt id not correct",
+ reduceAttempt.getID(), events[3].attemptId);
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.OBSOLETE, events[0].status);
+ Assert.assertEquals("Event status not correct for map attempt1",
+ TaskAttemptCompletionEventStatus.FAILED, events[1].status);
+ Assert.assertEquals("Event status not correct for map attempt2",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].status);
+ Assert.assertEquals("Event status not correct for reduce attempt1",
+ TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].status);
+ }
+
+ private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
+ TaskAttempt mapAttempt) {
+ app.getContext().getEventHandler().handle(
+ new JobTaskAttemptFetchFailureEvent(
+ reduceAttempt.getID(),
+ Arrays.asList(new TaskAttemptID[] {mapAttempt.getID()})));
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,134 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+public class TestJobHistoryParsing {
+ private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
+
+ @Test
+ public void testHistoryParsing() throws Exception {
+ Configuration conf = new Configuration();
+ MRApp app = new HistoryEnabledApp(2, 1, true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobID jobId = job.getID();
+ LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
+ String user =
+ conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+ String jobhistoryDir = conf.get("yarn.server.nodemanager.jobhistory",
+ "file:///tmp/yarn/done/") + user;
+ String jobstatusDir = conf.get("yarn.server.nodemanager.jobhistory",
+ "file:///tmp/yarn/done/status/") + user + "/" +
+ jobhistoryFileName;
+ FSDataInputStream in = null;
+ Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+ LOG.info("JOBHISTORYDIRE IS " + historyFilePath);
+ try {
+ FileContext fc = FileContext.getFileContext(historyFilePath.toUri());
+ in = fc.open(historyFilePath);
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file "+ ioe);
+ throw (new Exception("Can not open History File"));
+ }
+
+ JobHistoryParser parser = new JobHistoryParser(in);
+ JobInfo jobInfo = parser.parse();
+
+ Assert.assertTrue ("Incorrect username ",
+ jobInfo.getUsername().equals("mapred"));
+ Assert.assertTrue("Incorrect jobName ",
+ jobInfo.getJobname().equals("test"));
+ Assert.assertTrue("Incorrect queuename ",
+ jobInfo.getJobQueueName().equals("default"));
+ Assert.assertTrue("incorrect conf path",
+ jobInfo.getJobConfPath().equals("test"));
+ Assert.assertTrue("incorrect finishedMap ",
+ jobInfo.getFinishedMaps() == 2);
+ Assert.assertTrue("incorrect finishedReduces ",
+ jobInfo.getFinishedReduces() == 1);
+ int totalTasks = jobInfo.getAllTasks().size();
+ Assert.assertTrue("total number of tasks is incorrect ", totalTasks == 3);
+
+ //Assert at taskAttempt level
+ for (TaskInfo taskInfo : jobInfo.getAllTasks().values()) {
+ int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
+ Assert.assertTrue("total number of task attempts ",
+ taskAttemptCount == 1);
+ }
+
+ // Test for checking jobstats for job status store
+ Path statusFilePath = new Path(jobstatusDir, "jobstats");
+ try {
+ FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
+ in = fc.open(statusFilePath);
+ } catch (IOException ioe) {
+ LOG.info("Can not open status file "+ ioe);
+ throw (new Exception("Can not open status File"));
+ }
+ parser = new JobHistoryParser(in);
+ jobInfo = parser.parse();
+ Assert.assertTrue("incorrect finishedMap in job stats file ",
+ jobInfo.getFinishedMaps() == 2);
+ Assert.assertTrue("incorrect finishedReduces in job stats file ",
+ jobInfo.getFinishedReduces() == 1);
+ }
+
+ static class HistoryEnabledApp extends MRApp {
+ public HistoryEnabledApp(int maps, int reduces, boolean autoComplete) {
+ super(maps, reduces, autoComplete);
+ }
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ Configuration conf) {
+ return new JobHistoryEventHandler(conf);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestJobHistoryParsing t = new TestJobHistoryParsing();
+ t.testHistoryParsing();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,221 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
+ *
+ */
+public class TestKill {
+
+ @Test
+ public void testKillJob() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ MRApp app = new BlockingMRApp(1, 0, latch);
+ //this will start the job but job won't complete as task is
+ //blocked
+ Job job = app.submit(new Configuration());
+
+ //wait and vailidate for Job to become RUNNING
+ app.waitForState(job, JobState.RUNNING);
+
+ //send the kill signal to Job
+ app.getContext().getEventHandler().handle(
+ new JobEvent(job.getID(), JobEventType.JOB_KILL));
+
+ //unblock Task
+ latch.countDown();
+
+ //wait and validate for Job to be KILLED
+ app.waitForState(job, JobState.KILLED);
+ Map<TaskID,Task> tasks = job.getTasks();
+ Assert.assertEquals("No of tasks is not correct", 1,
+ tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.KILLED,
+ task.getReport().state);
+ Map<TaskAttemptID, TaskAttempt> attempts =
+ tasks.values().iterator().next().getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 1,
+ attempts.size());
+ Iterator<TaskAttempt> it = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
+ it.next().getReport().state);
+ }
+
+ @Test
+ public void testKillTask() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ MRApp app = new BlockingMRApp(2, 0, latch);
+ //this will start the job but job won't complete as Task is blocked
+ Job job = app.submit(new Configuration());
+
+ //wait and vailidate for Job to become RUNNING
+ app.waitForState(job, JobState.RUNNING);
+ Map<TaskID,Task> tasks = job.getTasks();
+ Assert.assertEquals("No of tasks is not correct", 2,
+ tasks.size());
+ Iterator<Task> it = tasks.values().iterator();
+ Task task1 = it.next();
+ Task task2 = it.next();
+
+ //send the kill signal to the first Task
+ app.getContext().getEventHandler().handle(
+ new TaskEvent(task1.getID(), TaskEventType.T_KILL));
+
+ //unblock Task
+ latch.countDown();
+
+ //wait and validate for Job to become SUCCEEDED
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ //first Task is killed and second is Succeeded
+ //Job is succeeded
+
+ Assert.assertEquals("Task state not correct", TaskState.KILLED,
+ task1.getReport().state);
+ Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+ task2.getReport().state);
+ Map<TaskAttemptID, TaskAttempt> attempts = task1.getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 1,
+ attempts.size());
+ Iterator<TaskAttempt> iter = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
+ iter.next().getReport().state);
+
+ attempts = task2.getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 1,
+ attempts.size());
+ iter = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+ iter.next().getReport().state);
+ }
+
+ @Test
+ public void testKillTaskAttempt() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ MRApp app = new BlockingMRApp(2, 0, latch);
+ //this will start the job but job won't complete as Task is blocked
+ Job job = app.submit(new Configuration());
+
+ //wait and vailidate for Job to become RUNNING
+ app.waitForState(job, JobState.RUNNING);
+ Map<TaskID,Task> tasks = job.getTasks();
+ Assert.assertEquals("No of tasks is not correct", 2,
+ tasks.size());
+ Iterator<Task> it = tasks.values().iterator();
+ Task task1 = it.next();
+ Task task2 = it.next();
+
+ //wait for tasks to become running
+ app.waitForState(task1, TaskState.SCHEDULED);
+ app.waitForState(task2, TaskState.SCHEDULED);
+
+ //send the kill signal to the first Task's attempt
+ TaskAttempt attempt = task1.getAttempts().values().iterator().next();
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_KILL));
+
+ //unblock
+ latch.countDown();
+
+ //wait and validate for Job to become SUCCEEDED
+ //job will still succeed
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ //first Task will have two attempts 1st is killed, 2nd Succeeds
+ //both Tasks and Job succeeds
+ Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+ task1.getReport().state);
+ Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+ task2.getReport().state);
+
+ Map<TaskAttemptID, TaskAttempt> attempts = task1.getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 2,
+ attempts.size());
+ Iterator<TaskAttempt> iter = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED,
+ iter.next().getReport().state);
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+ iter.next().getReport().state);
+
+ attempts = task2.getAttempts();
+ Assert.assertEquals("No of attempts is not correct", 1,
+ attempts.size());
+ iter = attempts.values().iterator();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+ iter.next().getReport().state);
+ }
+
+ static class BlockingMRApp extends MRApp {
+ private CountDownLatch latch;
+ BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
+ super(maps, reduces, true);
+ this.latch = latch;
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptID attemptID) {
+ if (attemptID.taskID.id == 0 && attemptID.id == 0) {
+ //this blocks the first task's first attempt
+ //the subsequent ones are completed
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ } else {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_DONE));
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestKill t = new TestKill();
+ t.testKillJob();
+ t.testKillTask();
+ t.testKillTaskAttempt();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine of MR App.
+ */
+public class TestMRApp {
+
+ @Test
+ public void testMapReduce() throws Exception {
+ MRApp app = new MRApp(2, 2, true);
+ Job job = app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ }
+
+ @Test
+ public void testCommitPending() throws Exception {
+ MRApp app = new MRApp(1, 0, false);
+ Job job = app.submit(new Configuration());
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 1, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task task = it.next();
+ app.waitForState(task, TaskState.RUNNING);
+ TaskAttempt attempt = task.getAttempts().values().iterator().next();
+ app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+ //send the commit pending signal to the task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ attempt.getID(),
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+
+ //wait for first attempt to commit pending
+ app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
+
+ //send the done signal to the task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ public void testCompletedMapsForReduceSlowstart() throws Exception {
+ MRApp app = new MRApp(2, 1, false);
+ Configuration conf = new Configuration();
+ //after half of the map completion, reduce will start
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ //all maps would be running
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task reduceTask = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
+ TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+ app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+ // reduces must be in NEW state
+ Assert.assertEquals("Reduce Task state not correct",
+ TaskState.NEW, reduceTask.getReport().state);
+
+ //send the done signal to the 1st map task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapTask1.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ //Once the first map completes, it will schedule the reduces
+ //now reduce must be running
+ app.waitForState(reduceTask, TaskState.RUNNING);
+
+ //send the done signal to 2nd map and the reduce to complete the job
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapTask2.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduceTask.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ }
+
+ @Test
+ public void testJobError() throws Exception {
+ MRApp app = new MRApp(1, 0, false);
+ Job job = app.submit(new Configuration());
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 1, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task task = it.next();
+ app.waitForState(task, TaskState.RUNNING);
+
+ //send an invalid event on task at current state
+ app.getContext().getEventHandler().handle(
+ new TaskEvent(
+ task.getID(), TaskEventType.T_SCHEDULE));
+
+ //this must lead to job error
+ app.waitForState(job, JobState.ERROR);
+ }
+
+ @Test
+ public void checkJobStateTypeConversion() {
+ //verify that all states can be converted without
+ // throwing an exception
+ for (JobState state : JobState.values()) {
+ TypeConverter.fromYarn(state);
+ }
+ }
+
+ @Test
+ public void checkTaskStateTypeConversion() {
+ //verify that all states can be converted without
+ // throwing an exception
+ for (TaskState state : TaskState.values()) {
+ TypeConverter.fromYarn(state);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestMRApp t = new TestMRApp();
+ t.testMapReduce();
+ t.testCommitPending();
+ t.testCompletedMapsForReduceSlowstart();
+ t.testJobError();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.junit.Test;
+
+public class TestMRClientService {
+
+ @Test
+ public void test() throws Exception {
+ MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task task = it.next();
+ app.waitForState(task, TaskState.RUNNING);
+ TaskAttempt attempt = task.getAttempts().values().iterator().next();
+ app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+ // send the diagnostic
+ String diagnostic1 = "Diagnostic1";
+ String diagnostic2 = "Diagnostic2";
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptDiagnosticsUpdateEvent(attempt.getID(), diagnostic1));
+
+ // send the status update
+ TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+ taskAttemptStatus.id = attempt.getID();
+ taskAttemptStatus.progress = 0.5f;
+ taskAttemptStatus.diagnosticInfo = diagnostic2;
+ taskAttemptStatus.stateString = "RUNNING";
+ taskAttemptStatus.phase = Phase.MAP;
+ taskAttemptStatus.outputSize = 3;
+ // send the status update
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
+
+
+ //verify that all object are fully populated by invoking RPCs.
+ YarnRPC rpc = YarnRPC.create(conf);
+ MRClientProtocol proxy =
+ (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ app.clientService.getBindAddress(), conf);
+ Assert.assertNotNull("Counters is null", proxy.getCounters(job.getID()));
+ Assert.assertNotNull("JobReport is null", proxy.getJobReport(job.getID()));
+ Assert.assertNotNull("TaskCompletionEvents is null",
+ proxy.getTaskAttemptCompletionEvents(job.getID(), 0, 10));
+ Assert.assertNotNull("Diagnostics is null",
+ proxy.getDiagnostics(attempt.getID()));
+ Assert.assertNotNull("TaskAttemptReport is null",
+ proxy.getTaskAttemptReport(attempt.getID()));
+ Assert.assertNotNull("TaskReport is null",
+ proxy.getTaskReport(task.getID()));
+
+ Assert.assertNotNull("TaskReports for map is null",
+ proxy.getTaskReports(job.getID(),
+ TaskType.MAP));
+ Assert.assertNotNull("TaskReports for reduce is null",
+ proxy.getTaskReports(job.getID(),
+ TaskType.REDUCE));
+
+ List<CharSequence> diag = proxy.getDiagnostics(attempt.getID());
+ Assert.assertEquals("No of diagnostic not correct" , 2 , diag.size());
+ Assert.assertEquals("Diag 1 not correct" ,
+ diagnostic1, diag.get(0).toString());
+ Assert.assertEquals("Diag 2 not correct" ,
+ diagnostic2, diag.get(1).toString());
+
+ TaskReport taskReport = proxy.getTaskReport(task.getID());
+ Assert.assertEquals("No of diagnostic not correct", 2,
+ taskReport.diagnostics.size());
+
+ //send the done signal to the task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task.getAttempts().values().iterator().next().getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ }
+
+ class MRAppWithClientService extends MRApp {
+ MRClientService clientService = null;
+ MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
+ super(maps, reduces, autoComplete);
+ }
+ @Override
+ protected ClientService createClientService(AppContext context) {
+ clientService = new MRClientService(context);
+ return clientService;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestMRClientService t = new TestMRClientService();
+ t.test();
+ }
+}