You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [8/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,341 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+
+/**
+ * Mock MRAppMaster. Doesn't start RPC servers.
+ * No threads are started except of the event Dispatcher thread.
+ */
+public class MRApp extends MRAppMaster {
+
+  int maps;
+  int reduces;
+
+  //if true tasks complete automatically as soon as they are launched
+  protected boolean autoComplete = false;
+
+  public MRApp(int maps, int reduces, boolean autoComplete) {
+    super(new ApplicationID());
+    this.maps = maps;
+    this.reduces = reduces;
+    this.autoComplete = autoComplete;
+  }
+
+  public Job submit(Configuration conf) throws Exception {
+    String user = conf.get(MRJobConfig.USER_NAME, "mapred");
+    conf.set(MRJobConfig.USER_NAME, user);
+    init(conf);
+    start();
+    Job job = getContext().getAllJobs().values().iterator().next();
+    return job;
+  }
+
+  public void waitForState(TaskAttempt attempt, 
+      TaskAttemptState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    while (!finalState.equals(report.state) &&
+        timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt State is : " + report.state +
+          " Waiting for state : " + finalState +
+          "   progress : " + report.progress);
+      report = attempt.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("TaskAttempt State is : " + report.state);
+    Assert.assertEquals("TaskAttempt state is not correct (timedout)",
+        finalState, 
+        report.state);
+  }
+
+  public void waitForState(Task task, TaskState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskReport report = task.getReport();
+    while (!finalState.equals(report.state) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Task State is : " + report.state +
+          " Waiting for state : " + finalState +
+          "   progress : " + report.progress);
+      report = task.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Task State is : " + report.state);
+    Assert.assertEquals("Task state is not correct (timedout)", finalState, 
+        report.state);
+  }
+
+  public void waitForState(Job job, JobState finalState) throws Exception {
+    int timeoutSecs = 0;
+    JobReport report = job.getReport();
+    while (!finalState.equals(report.state) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Job State is : " + report.state +
+          " Waiting for state : " + finalState +
+          "   map progress : " + report.mapProgress + 
+          "   reduce progress : " + report.reduceProgress);
+      report = job.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Job State is : " + report.state);
+    Assert.assertEquals("Job state is not correct (timedout)", finalState, 
+        job.getState());
+  }
+
+  public void verifyCompleted() {
+    for (Job job : getContext().getAllJobs().values()) {
+      JobReport jobReport = job.getReport();
+      Assert.assertTrue("Job start time is  not less than finish time",
+          jobReport.startTime < jobReport.finishTime);
+      Assert.assertTrue("Job finish time is in future",
+          jobReport.finishTime < System.currentTimeMillis());
+      for (Task task : job.getTasks().values()) {
+        TaskReport taskReport = task.getReport();
+        Assert.assertTrue("Task start time is  not less than finish time",
+            taskReport.startTime < taskReport.finishTime);
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          TaskAttemptReport attemptReport = attempt.getReport();
+          Assert.assertTrue("Attempt start time is  not less than finish time",
+              attemptReport.startTime < attemptReport.finishTime);
+        }
+      }
+    }
+  }
+
+  protected void startJobs() {
+    Job job = new TestJob(getAppID(), getDispatcher().getEventHandler(),
+        getTaskAttemptListener());
+    ((AppContext) getContext()).getAllJobs().put(job.getID(), job);
+
+    getDispatcher().register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        createJobHistoryHandler(getConfig()));
+    getDispatcher().register(JobFinishEvent.Type.class,
+        new EventHandler<JobFinishEvent>() {
+          @Override
+          public void handle(JobFinishEvent event) {
+            stop();
+          }
+        });
+    
+    /** create a job event for job intialization **/
+    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
+    /** send init on the job. this triggers the job execution.**/
+    getDispatcher().getEventHandler().handle(initJobEvent);
+  }
+
+  @Override
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+    return new TaskAttemptListener(){
+      @Override
+      public InetSocketAddress getAddress() {
+        return null;
+      }
+      @Override
+      public void register(TaskAttemptID attemptID, 
+          org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
+      @Override
+      public void unregister(TaskAttemptID attemptID, WrappedJvmID jvmID) {
+      }
+    };
+  }
+
+  @Override
+  protected ContainerLauncher createContainerLauncher(AppContext context) {
+    return new MockContainerLauncher();
+  }
+
+  class MockContainerLauncher implements ContainerLauncher {
+    @Override
+    public void handle(ContainerLauncherEvent event) {
+      switch (event.getType()) {
+      case CONTAINER_REMOTE_LAUNCH:
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(event.getTaskAttemptID(),
+                TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+        
+        attemptLaunched(event.getTaskAttemptID());
+        break;
+      case CONTAINER_REMOTE_CLEANUP:
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(event.getTaskAttemptID(),
+                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        break;
+      }
+    }
+  }
+
+  protected void attemptLaunched(TaskAttemptID attemptID) {
+    if (autoComplete) {
+      // send the done event
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attemptID,
+              TaskAttemptEventType.TA_DONE));
+    }
+  }
+
+  @Override
+  protected ContainerAllocator createContainerAllocator(
+      ClientService clientService, AppContext context) {
+    return new ContainerAllocator(){
+      private int containerCount;
+      @Override
+      public void handle(ContainerAllocatorEvent event) {
+        ContainerID cId = new ContainerID();
+        cId.appID = getContext().getApplicationID();
+        cId.id = containerCount++;
+        getContext().getEventHandler().handle(
+            new TaskAttemptContainerAssignedEvent(event.getAttemptID(), cId,
+                "dummy", null));
+      }
+    };
+  }
+
+  @Override
+  protected TaskCleaner createTaskCleaner(AppContext context) {
+    return new TaskCleaner() {
+      @Override
+      public void handle(TaskCleanupEvent event) {
+        //send the cleanup done event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(event.getAttemptID(),
+                TaskAttemptEventType.TA_CLEANUP_DONE));
+      }
+    };
+  }
+
+  @Override
+  protected ClientService createClientService(AppContext context) {
+    return new ClientService(){
+      @Override
+      public InetSocketAddress getBindAddress() {
+        return null;
+      }
+
+      @Override
+      public int getHttpPort() {
+        return -1;
+      }
+    };
+  }
+
+  class TestJob extends JobImpl {
+    //overwrite the init transition
+    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
+         //overwrite the init transition
+         = stateMachineFactory.addTransition
+                 (JobState.NEW,
+                  EnumSet.of(JobState.RUNNING, JobState.FAILED),
+                  JobEventType.JOB_INIT,
+                  // This is abusive.
+                  new TestInitTransition(getConfig(), maps, reduces));
+
+    private final StateMachine<JobState, JobEventType, JobEvent>
+           localStateMachine;
+    
+    @Override
+    protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+      return localStateMachine;
+    }
+
+    public TestJob(ApplicationID appID, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener) {
+      super(appID, new Configuration(), eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials());
+
+      // This "this leak" is okay because the retained pointer is in an
+      //  instance variable.
+      localStateMachine = localFactory.make(this);
+    }
+    
+  }
+  
+  //Override InitTransition to not look for split files etc
+  static class TestInitTransition extends JobImpl.InitTransition {
+    private Configuration config;
+    private int maps;
+    private int reduces;
+    TestInitTransition(Configuration config, int maps, int reduces) {
+      this.config = config;
+      this.maps = maps;
+      this.reduces = reduces;
+    }
+    @Override
+    protected void setup(JobImpl job) throws IOException {
+      job.conf = config;
+      job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
+      job.remoteJobConfFile = new Path("test");
+    }
+    @Override
+    protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobID jobId) {
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
+      for (int i = 0; i < maps ; i++) {
+        splits[i] = new TaskSplitMetaInfo();
+      }
+      return splits;
+    }
+  }
+
+}
+ 

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class MRAppBenchmark {
+
+  /**
+   * Runs memory and time benchmark with Mock MRApp.
+   */
+  public void run(MRApp app) throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.WARN);
+    long startTime = System.currentTimeMillis();
+    Job job = app.submit(new Configuration());
+    while (!job.getReport().state.equals(JobState.SUCCEEDED)) {
+      printStat(job, startTime);
+      Thread.sleep(2000);
+    }
+    printStat(job, startTime);
+  }
+
+  private void printStat(Job job, long startTime) throws Exception {
+    long currentTime = System.currentTimeMillis();
+    Runtime.getRuntime().gc();
+    long mem = Runtime.getRuntime().totalMemory() 
+      - Runtime.getRuntime().freeMemory();
+    System.out.println("JobState:" + job.getState() +
+        " CompletedMaps:" + job.getCompletedMaps() +
+        " CompletedReduces:" + job.getCompletedReduces() +
+        " Memory(total-free)(KB):" + mem/1024 +
+        " ElapsedTime(ms):" + (currentTime - startTime));
+  }
+
+  //Throttles the maximum number of concurrent running tasks.
+  //This affects the memory requirement since 
+  //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+  //running task and discarded once the task is launched.
+  static class ThrottledMRApp extends MRApp {
+
+    int maxConcurrentRunningTasks;
+    volatile int concurrentRunningTasks;
+    ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+      super(maps, reduces, true);
+      this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+    }
+    
+    @Override
+    protected void attemptLaunched(TaskAttemptID attemptID) {
+      super.attemptLaunched(attemptID);
+      //the task is launched and sends done immediately
+      concurrentRunningTasks--;
+    }
+    
+    @Override
+    protected ContainerAllocator createContainerAllocator(
+        ClientService clientService, AppContext context) {
+      return new ThrottledContainerAllocator();
+    }
+    
+    class ThrottledContainerAllocator extends AbstractService 
+        implements ContainerAllocator {
+      private int containerCount;
+      private Thread thread;
+      private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+        new LinkedBlockingQueue<ContainerAllocatorEvent>();
+      public ThrottledContainerAllocator() {
+        super("ThrottledContainerAllocator");
+      }
+      @Override
+      public void handle(ContainerAllocatorEvent event) {
+        try {
+          eventQueue.put(event);
+        } catch (InterruptedException e) {
+          throw new YarnException(e);
+        }
+      }
+      @Override
+      public void start() {
+        thread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            ContainerAllocatorEvent event = null;
+            while (!Thread.currentThread().isInterrupted()) {
+              try {
+                if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+                  event = eventQueue.take();
+                  ContainerID cId = new ContainerID();
+                  cId.appID = getContext().getApplicationID();
+                  cId.id = containerCount++;
+                  //System.out.println("Allocating " + containerCount);
+                  getContext().getEventHandler().handle(
+                      new TaskAttemptContainerAssignedEvent(event
+                          .getAttemptID(), cId, "dummy", null));
+                  concurrentRunningTasks++;
+                } else {
+                  Thread.sleep(1000);
+                }
+              } catch (InterruptedException e) {
+                System.out.println("Returning, interrupted");
+                return;
+              }
+            }
+          }
+        });
+        thread.start();
+        super.start();
+      }
+
+      @Override
+      public void stop() {
+        thread.interrupt();
+        super.stop();
+      }
+    }
+  }
+
+  public void benchmark1() throws Exception {
+    int maps = 900;
+    int reduces = 100;
+    System.out.println("Running benchmark with maps:"+maps +
+        " reduces:"+reduces);
+    run(new MRApp(maps, reduces, true));
+  }
+
+  public void benchmark2() throws Exception {
+    int maps = 4000;
+    int reduces = 1000;
+    int maxConcurrentRunningTasks = 500;
+    
+    System.out.println("Running benchmark with throttled running tasks with " +
+        "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+        " maps:" + maps + " reduces:" + reduces);
+    run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+  }
+
+  public static void main(String[] args) throws Exception {
+    MRAppBenchmark benchmark = new MRAppBenchmark();
+    benchmark.benchmark1();
+    benchmark.benchmark2();
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,438 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class MockJobs extends MockApps {
+  static final Iterator<JobState> JOB_STATES = Iterators.cycle(
+      JobState.values());
+  static final Iterator<TaskState> TASK_STATES = Iterators.cycle(
+      TaskState.values());
+  static final Iterator<TaskAttemptState> TASK_ATTEMPT_STATES = Iterators.cycle(
+      TaskAttemptState.values());
+  static final Iterator<TaskType> TASK_TYPES = Iterators.cycle(
+      TaskType.values());
+  static final Iterator<JobCounter> JOB_COUNTERS = Iterators.cycle(
+      JobCounter.values());
+  static final Iterator<FileSystemCounter> FS_COUNTERS = Iterators.cycle(
+      FileSystemCounter.values());
+  static final Iterator<TaskCounter> TASK_COUNTERS = Iterators.cycle(
+      TaskCounter.values());
+  static final Iterator<String> FS_SCHEMES = Iterators.cycle("FILE", "HDFS",
+      "LAFS", "CEPH");
+  static final Iterator<String> USER_COUNTER_GROUPS = Iterators.cycle(
+      "com.company.project.subproject.component.subcomponent.UserDefinedSpecificSpecialTask$Counters",
+      "PigCounters");
+  static final Iterator<String> USER_COUNTERS = Iterators.cycle(
+      "counter1", "counter2", "counter3");
+  static final Iterator<Phase> PHASES = Iterators.cycle(Phase.values());
+  static final Iterator<String> DIAGS = Iterators.cycle(
+      "Error: java.lang.OutOfMemoryError: Java heap space",
+      "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+  public static String newJobName() {
+    return newAppName();
+  }
+
+  public static Map<JobID, Job> newJobs(ApplicationID appID, int numJobsPerApp,
+                                        int numTasksPerJob,
+                                        int numAttemptsPerTask) {
+    Map<JobID, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobsPerApp; ++j) {
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+
+  public static JobID newJobID(ApplicationID appID, int i) {
+    JobID id = new JobID();
+    id.appID = appID;
+    id.id = i;
+    return id;
+  }
+
+  public static JobReport newJobReport(JobID id) {
+    JobReport report = new JobReport();
+    report.id = id;
+    report.startTime = System.currentTimeMillis()
+        - (int)(Math.random() * 1000000);
+    report.finishTime = System.currentTimeMillis()
+        + (int)(Math.random() * 1000000) + 1;
+    report.mapProgress = (float)Math.random();
+    report.reduceProgress = (float)Math.random();
+    report.state = JOB_STATES.next();
+    return report;
+  }
+
+  public static TaskReport newTaskReport(TaskID id) {
+    TaskReport report = new TaskReport();
+    report.id = id;
+    report.startTime = System.currentTimeMillis()
+        - (int)(Math.random() * 1000000);
+    report.finishTime = System.currentTimeMillis()
+        + (int)(Math.random() * 1000000) + 1;
+    report.progress = (float)Math.random();
+    report.counters = newCounters();
+    report.state = TASK_STATES.next();
+    return report;
+  }
+
+  public static TaskAttemptReport newTaskAttemptReport(TaskAttemptID id) {
+    TaskAttemptReport report = new TaskAttemptReport();
+    report.id = id;
+    report.startTime = System.currentTimeMillis()
+        - (int)(Math.random() * 1000000);
+    report.finishTime = System.currentTimeMillis()
+        + (int)(Math.random() * 1000000) + 1;
+    report.phase = PHASES.next();
+    report.state = TASK_ATTEMPT_STATES.next();
+    report.progress = (float)Math.random();
+    report.counters = newCounters();
+    return report;
+  }
+
+  @SuppressWarnings("deprecation")
+  public static Counters newCounters() {
+    org.apache.hadoop.mapred.Counters hc =
+        new org.apache.hadoop.mapred.Counters();
+    for (JobCounter c : JobCounter.values()) {
+      hc.findCounter(c).setValue((long)(Math.random() * 1000));
+    }
+    for (TaskCounter c : TaskCounter.values()) {
+      hc.findCounter(c).setValue((long)(Math.random() * 1000));
+    }
+    int nc = FileSystemCounter.values().length * 4;
+    for (int i = 0; i < nc; ++i) {
+      for (FileSystemCounter c : FileSystemCounter.values()) {
+        hc.findCounter(FS_SCHEMES.next(), c).
+            setValue((long)(Math.random() * 1000000));
+      }
+    }
+    for (int i = 0; i < 2 * 3; ++i) {
+      hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next()).
+          setValue((long)(Math.random() * 100000));
+    }
+    return TypeConverter.toYarn(hc);
+  }
+
+  public static Map<TaskAttemptID, TaskAttempt> newTaskAttempts(TaskID tid,
+                                                                int m) {
+    Map<TaskAttemptID, TaskAttempt> map = Maps.newHashMap();
+    for (int i = 0; i < m; ++i) {
+      TaskAttempt ta = newTaskAttempt(tid, i);
+      map.put(ta.getID(), ta);
+    }
+    return map;
+  }
+
+  public static TaskAttempt newTaskAttempt(TaskID tid, int i) {
+    final TaskAttemptID taid = new TaskAttemptID();
+    taid.taskID = tid;
+    taid.id = i;
+    final TaskAttemptReport report = newTaskAttemptReport(taid);
+    final List<CharSequence> diags = Lists.newArrayList();
+    diags.add(DIAGS.next());
+    return new TaskAttempt() {
+      @Override
+      public TaskAttemptID getID() {
+        return taid;
+      }
+
+      @Override
+      public TaskAttemptReport getReport() {
+        return report;
+      }
+
+      @Override
+      public long getLaunchTime() {
+        return 0;
+      }
+
+      @Override
+      public long getFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public Counters getCounters() {
+        return report.counters;
+      }
+
+      @Override
+      public float getProgress() {
+        return report.progress;
+      }
+
+      @Override
+      public TaskAttemptState getState() {
+        return report.state;
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.state) {
+          case SUCCEEDED:
+          case FAILED:
+          case KILLED: return true;
+        }
+        return false;
+      }
+
+      @Override
+      public ContainerID getAssignedContainerID() {
+        ContainerID id = new ContainerID();
+        id.appID = taid.taskID.jobID.appID;
+        return id;
+      }
+
+      @Override
+      public String getAssignedContainerMgrAddress() {
+        return "localhost";
+      }
+
+      @Override
+      public List<CharSequence> getDiagnostics() {
+        return diags;
+      }
+    };
+  }
+
+  public static Map<TaskID, Task> newTasks(JobID jid, int n, int m) {
+    Map<TaskID, Task> map = Maps.newHashMap();
+    for (int i = 0; i < n; ++i) {
+      Task task = newTask(jid, i, m);
+      map.put(task.getID(), task);
+    }
+    return map;
+  }
+
+  public static Task newTask(JobID jid, int i, int m) {
+    final TaskID tid = new TaskID();
+    tid.jobID = jid;
+    tid.id = i;
+    tid.taskType = TASK_TYPES.next();
+    final TaskReport report = newTaskReport(tid);
+    final Map<TaskAttemptID, TaskAttempt> attempts = newTaskAttempts(tid, m);
+    return new Task() {
+      @Override
+      public TaskID getID() {
+        return tid;
+      }
+
+      @Override
+      public TaskReport getReport() {
+        return report;
+      }
+
+      @Override
+      public Counters getCounters() {
+        return report.counters;
+      }
+
+      @Override
+      public float getProgress() {
+        return report.progress;
+      }
+
+      @Override
+      public TaskType getType() {
+        return tid.taskType;
+      }
+
+      @Override
+      public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+        return attempts;
+      }
+
+      @Override
+      public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+        return attempts.get(attemptID);
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.state) {
+          case SUCCEEDED:
+          case KILLED:
+          case FAILED: return true;
+        }
+        return false;
+      }
+
+      @Override
+      public boolean canCommit(TaskAttemptID taskAttemptID) {
+        return false;
+      }
+
+      @Override
+      public TaskState getState() {
+        return report.state;
+      }
+    };
+  }
+
+  public static Counters getCounters(Collection<Task> tasks) {
+    Counters counters = JobImpl.newCounters();
+    return JobImpl.incrTaskCounters(counters, tasks);
+  }
+
+  static class TaskCount {
+    int maps;
+    int reduces;
+    int completedMaps;
+    int completedReduces;
+
+    void incr(Task task) {
+      TaskType type = task.getType();
+      boolean finished = task.isFinished();
+      if (type == TaskType.MAP) {
+        if (finished) {
+          ++completedMaps;
+        }
+        ++maps;
+      } else if (type == TaskType.REDUCE) {
+        if (finished) {
+          ++completedReduces;
+        }
+        ++reduces;
+      }
+    }
+  }
+
+  static TaskCount getTaskCount(Collection<Task> tasks) {
+    TaskCount tc = new TaskCount();
+    for (Task task : tasks) {
+      tc.incr(task);
+    }
+    return tc;
+  }
+
+  public static Job newJob(ApplicationID appID, int i, int n, int m) {
+    final JobID id = newJobID(appID, i);
+    final String name = newJobName();
+    final JobReport report = newJobReport(id);
+    final Map<TaskID, Task> tasks = newTasks(id, n, m);
+    final TaskCount taskCount = getTaskCount(tasks.values());
+    final Counters counters = getCounters(tasks.values());
+    return new Job() {
+      @Override
+      public JobID getID() {
+        return id;
+      }
+
+      @Override
+      public CharSequence getName() {
+        return name;
+      }
+
+      @Override
+      public JobState getState() {
+        return report.state;
+      }
+
+      @Override
+      public JobReport getReport() {
+        return report;
+      }
+
+      @Override
+      public Counters getCounters() {
+        return counters;
+      }
+
+      @Override
+      public Map<TaskID, Task> getTasks() {
+        return tasks;
+      }
+
+      @Override
+      public Task getTask(TaskID taskID) {
+        return tasks.get(taskID);
+      }
+
+      @Override
+      public int getTotalMaps() {
+        return taskCount.maps;
+      }
+
+      @Override
+      public int getTotalReduces() {
+        return taskCount.reduces;
+      }
+
+      @Override
+      public int getCompletedMaps() {
+        return taskCount.completedMaps;
+      }
+
+      @Override
+      public int getCompletedReduces() {
+        return taskCount.completedReduces;
+      }
+
+      @Override
+      public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId,
+                                                           int maxEvents) {
+        return null;
+      }
+
+      @Override
+      public Map<TaskID, Task> getTasks(TaskType taskType) {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public List<String> getDiagnostics() {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+    };
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/SpeculationSleepJob.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,90 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class SpeculationSleepJob extends SleepJob {
+  public Job createJob(int numMapper, int numReducer,
+                       long mapSleepTime, int mapSleepCount,
+                       long reduceSleepTime, int reduceSleepCount)
+      throws IOException {
+    Job result = super.createJob
+        (numMapper, numReducer, mapSleepTime,
+         mapSleepCount, reduceSleepTime, reduceSleepCount);
+
+    result.setMapperClass(SpeculationSleepMapper.class);
+
+    return result;
+  }
+
+  // This is a new class rather than a subclass of SleepJob.SleepMapper
+  //  because SleepMapper has private [rather than protected] fields
+  class SpeculationSleepMapper
+      extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+    private long mapSleepDuration = 100;
+    private int mapSleepCount = 1;
+    private int count = 0;
+
+    private final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+
+    @Override
+    protected void setup(Context context)
+      throws IOException, InterruptedException {
+      Configuration conf = context.getConfiguration();
+      this.mapSleepCount =
+        conf.getInt(MAP_SLEEP_COUNT, mapSleepCount);
+      this.mapSleepDuration =
+        conf.getLong(MAP_SLEEP_TIME , 100) / mapSleepCount;
+      final String attemptID = conf.get(TASK_ATTEMPT_ID);
+      if (attemptID.endsWith("0000_0")) {
+        mapSleepDuration *= 20L;
+      }
+    }
+
+    // we need to do this instead of
+    @Override
+    public void map(IntWritable key, IntWritable value, Context context
+               ) throws IOException, InterruptedException {
+      //it is expected that every map processes mapSleepCount number of records.
+      try {
+        context.setStatus("Sleeping... (" +
+          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+        Thread.sleep(mapSleepDuration);
+      }
+      catch (InterruptedException ex) {
+        throw (IOException)new IOException("Interrupted while sleeping", ex);
+      }
+      ++count;
+      // output reduceSleepCount * numReduce number of random values, so that
+      // each reducer will get reduceSleepCount number of keys.
+      int k = key.get();
+      for (int i = 0; i < value.get(); ++i) {
+        context.write(new IntWritable(k + i), NullWritable.get());
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,235 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt failure 
+ * scenarios.
+ */
+public class TestFail {
+
+  @Test
+  //First attempt is failed and second attempt is passed
+  //The job succeeds.
+  public void testFailTask() throws Exception {
+    MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    Map<TaskID,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 1, 
+        tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task.getReport().state);
+    Map<TaskAttemptID, TaskAttempt> attempts = 
+      tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 2, 
+        attempts.size());
+    //one attempt must be failed 
+    //and another must have succeeded
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, 
+          it.next().getReport().state);
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+        it.next().getReport().state);
+  }
+
+  @Test
+  public void testMapFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(4, 0);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(4, 0);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testReduceFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(2, 4);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(2, 4);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  //All Task attempts are timed out, leading to Job failure
+  public void testTimedOutTask() throws Exception {
+    MRApp app = new TimeOutTaskMRApp(1, 0);
+    Configuration conf = new Configuration();
+    int maxAttempts = 2;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskID,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 1, 
+        tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.FAILED, 
+        task.getReport().state);
+    Map<TaskAttemptID, TaskAttempt> attempts = 
+      tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("No of attempts is not correct", maxAttempts, 
+        attempts.size());
+    for (TaskAttempt attempt : attempts.values()) {
+      Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, 
+          attempt.getReport().state);
+    }
+  }
+
+  static class TimeOutTaskMRApp extends MRApp {
+    TimeOutTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, false);
+    }
+    @Override
+    protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+      //This will create the TaskAttemptListener with TaskHeartbeatHandler
+      //RPC servers are not started
+      //task time out is reduced
+      //when attempt times out, heartbeat handler will send the lost event
+      //leading to Attempt failure
+      return new TaskAttemptListenerImpl(getContext(), null) {
+        public void startRpcServer(){};
+        public void stopRpcServer(){};
+        public void init(Configuration conf) {
+          conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
+          super.init(conf);
+        }
+      };
+    }
+  }
+
+  //Attempts of first Task are failed
+  static class MockFirstFailingTaskMRApp extends MRApp {
+
+    MockFirstFailingTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptID attemptID) {
+      if (attemptID.taskID.id == 0) {//check if it is first task
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, 
+                TaskAttemptEventType.TA_FAILMSG));
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  //First attempt is failed
+  static class MockFirstFailingAttemptMRApp extends MRApp {
+    MockFirstFailingAttemptMRApp(int maps, int reduces) {
+      super(maps, reduces, true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptID attemptID) {
+      if (attemptID.taskID.id == 0 && attemptID.id == 0) {
+        //check if it is first task's first attempt
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, 
+                TaskAttemptEventType.TA_FAILMSG));
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestFail t = new TestFail();
+    t.testFailTask();
+    t.testTimedOutTask();
+    t.testMapFailureMaxPercent();
+    t.testReduceFailureMaxPercent();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+public class TestFetchFailure {
+
+  @Test
+  public void testFetchFailure() throws Exception {
+    MRApp app = new MRApp(1, 1, false);
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    
+    //wait for Task state move to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    TaskAttemptCompletionEvent[] events = 
+      job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("No of completion events not correct",
+        1, events.length);
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].status);
+    
+    // wait for reduce to start running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt reduceAttempt = 
+      reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+    
+    //send 3 fetch failures from reduce to trigger map re execution
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //wait for map Task state move back to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    
+    //map attempt must have become FAILED
+    Assert.assertEquals("Map TaskAttempt state not correct",
+        TaskAttemptState.FAILED, mapAttempt1.getState());
+
+    Assert.assertEquals("No of attempts in Map Task not correct",
+        2, mapTask.getAttempts().size());
+    
+    Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+    atIt.next();
+    TaskAttempt mapAttempt2 = atIt.next();
+    
+    app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+   //send the done signal to the second map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //previous completion event now becomes obsolete
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].status);
+    
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("No of completion events not correct",
+        4, events.length);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[0].attemptId);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[1].attemptId);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt2.getID(), events[2].attemptId);
+    Assert.assertEquals("Event redude attempt id not correct",
+        reduceAttempt.getID(), events[3].attemptId);
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].status);
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.FAILED, events[1].status);
+    Assert.assertEquals("Event status not correct for map attempt2",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].status);
+    Assert.assertEquals("Event status not correct for reduce attempt1",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].status);
+  }
+
+  private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
+      TaskAttempt mapAttempt) {
+    app.getContext().getEventHandler().handle(
+        new JobTaskAttemptFetchFailureEvent(
+            reduceAttempt.getID(), 
+            Arrays.asList(new TaskAttemptID[] {mapAttempt.getID()})));
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobHistoryParsing.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,134 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+public class TestJobHistoryParsing {
+  private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
+
+  @Test
+  public void testHistoryParsing() throws Exception {
+    Configuration conf = new Configuration();
+    MRApp app = new HistoryEnabledApp(2, 1, true);
+    app.submit(conf);
+    Job job = app.getContext().getAllJobs().values().iterator().next();
+    JobID jobId = job.getID();
+    LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    String jobhistoryFileName = TypeConverter.fromYarn(jobId).toString();
+    String user =
+      conf.get(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+    String jobhistoryDir = conf.get("yarn.server.nodemanager.jobhistory",
+        "file:///tmp/yarn/done/") + user; 
+    String jobstatusDir = conf.get("yarn.server.nodemanager.jobhistory",
+        "file:///tmp/yarn/done/status/") + user + "/" +
+        jobhistoryFileName;
+    FSDataInputStream in = null;
+    Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+    LOG.info("JOBHISTORYDIRE IS " + historyFilePath);
+    try {
+      FileContext fc = FileContext.getFileContext(historyFilePath.toUri());
+      in = fc.open(historyFilePath);
+    } catch (IOException ioe) {
+      LOG.info("Can not open history file "+ ioe);
+      throw (new Exception("Can not open History File"));
+    }
+    
+    JobHistoryParser parser = new JobHistoryParser(in);
+    JobInfo jobInfo = parser.parse();
+
+    Assert.assertTrue ("Incorrect username ",
+        jobInfo.getUsername().equals("mapred"));
+    Assert.assertTrue("Incorrect jobName ",
+        jobInfo.getJobname().equals("test"));
+    Assert.assertTrue("Incorrect queuename ",
+        jobInfo.getJobQueueName().equals("default"));
+    Assert.assertTrue("incorrect conf path",
+        jobInfo.getJobConfPath().equals("test"));
+    Assert.assertTrue("incorrect finishedMap ",
+        jobInfo.getFinishedMaps() == 2);
+    Assert.assertTrue("incorrect finishedReduces ",
+        jobInfo.getFinishedReduces() == 1);
+    int totalTasks = jobInfo.getAllTasks().size();
+    Assert.assertTrue("total number of tasks is incorrect  ", totalTasks == 3);
+
+    //Assert at taskAttempt level
+    for (TaskInfo taskInfo :  jobInfo.getAllTasks().values()) {
+      int taskAttemptCount = taskInfo.getAllTaskAttempts().size();
+      Assert.assertTrue("total number of task attempts ", 
+          taskAttemptCount == 1);
+    }
+
+   // Test for checking jobstats for job status store
+    Path statusFilePath = new Path(jobstatusDir, "jobstats");
+    try {
+      FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
+      in = fc.open(statusFilePath);
+    } catch (IOException ioe) {
+      LOG.info("Can not open status file "+ ioe);
+      throw (new Exception("Can not open status File"));
+    }
+    parser = new JobHistoryParser(in);
+    jobInfo = parser.parse();
+    Assert.assertTrue("incorrect finishedMap in job stats file ",
+        jobInfo.getFinishedMaps() == 2);
+    Assert.assertTrue("incorrect finishedReduces in job stats file ",
+        jobInfo.getFinishedReduces() == 1);
+  }
+
+  static class HistoryEnabledApp extends MRApp {
+    public HistoryEnabledApp(int maps, int reduces, boolean autoComplete) {
+      super(maps, reduces, autoComplete);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        Configuration conf) {
+      return new JobHistoryEventHandler(conf);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestJobHistoryParsing t = new TestJobHistoryParsing();
+    t.testHistoryParsing();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,221 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
+ *
+ */
+public class TestKill {
+
+  @Test
+  public void testKillJob() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    
+    MRApp app = new BlockingMRApp(1, 0, latch);
+    //this will start the job but job won't complete as task is
+    //blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    
+    //send the kill signal to Job
+    app.getContext().getEventHandler().handle(
+        new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    
+    //unblock Task
+    latch.countDown();
+
+    //wait and validate for Job to be KILLED
+    app.waitForState(job, JobState.KILLED);
+    Map<TaskID,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 1, 
+        tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.KILLED, 
+        task.getReport().state);
+    Map<TaskAttemptID, TaskAttempt> attempts = 
+      tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          it.next().getReport().state);
+  }
+
+  @Test
+  public void testKillTask() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    MRApp app = new BlockingMRApp(2, 0, latch);
+    //this will start the job but job won't complete as Task is blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskID,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 2, 
+        tasks.size());
+    Iterator<Task> it = tasks.values().iterator();
+    Task task1 = it.next();
+    Task task2 = it.next();
+    
+    //send the kill signal to the first Task
+    app.getContext().getEventHandler().handle(
+          new TaskEvent(task1.getID(), TaskEventType.T_KILL));
+    
+    //unblock Task
+    latch.countDown();
+    
+    //wait and validate for Job to become SUCCEEDED
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //first Task is killed and second is Succeeded
+    //Job is succeeded
+    
+    Assert.assertEquals("Task state not correct", TaskState.KILLED, 
+        task1.getReport().state);
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task2.getReport().state);
+    Map<TaskAttemptID, TaskAttempt> attempts = task1.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    Iterator<TaskAttempt> iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          iter.next().getReport().state);
+
+    attempts = task2.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+          iter.next().getReport().state);
+  }
+
+  @Test
+  public void testKillTaskAttempt() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    MRApp app = new BlockingMRApp(2, 0, latch);
+    //this will start the job but job won't complete as Task is blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskID,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 2, 
+        tasks.size());
+    Iterator<Task> it = tasks.values().iterator();
+    Task task1 = it.next();
+    Task task2 = it.next();
+    
+    //wait for tasks to become running
+    app.waitForState(task1, TaskState.SCHEDULED);
+    app.waitForState(task2, TaskState.SCHEDULED);
+    
+    //send the kill signal to the first Task's attempt
+    TaskAttempt attempt = task1.getAttempts().values().iterator().next();
+    app.getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_KILL));
+    
+    //unblock
+    latch.countDown();
+    
+    //wait and validate for Job to become SUCCEEDED
+    //job will still succeed
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //first Task will have two attempts 1st is killed, 2nd Succeeds
+    //both Tasks and Job succeeds
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task1.getReport().state);
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task2.getReport().state);
+ 
+    Map<TaskAttemptID, TaskAttempt> attempts = task1.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 2, 
+        attempts.size());
+    Iterator<TaskAttempt> iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          iter.next().getReport().state);
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+        iter.next().getReport().state);
+    
+    attempts = task2.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+          iter.next().getReport().state);
+  }
+
+  static class BlockingMRApp extends MRApp {
+    private CountDownLatch latch;
+    BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
+      super(maps, reduces, true);
+      this.latch = latch;
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptID attemptID) {
+      if (attemptID.taskID.id == 0 && attemptID.id == 0) {
+        //this blocks the first task's first attempt
+        //the subsequent ones are completed
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestKill t = new TestKill();
+    t.testKillJob();
+    t.testKillTask();
+    t.testKillTaskAttempt();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,187 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.junit.Test;
+
+/**
+ * Tests the state machine of MR App.
+ */
+public class TestMRApp {
+
+  @Test
+  public void testMapReduce() throws Exception {
+    MRApp app = new MRApp(2, 2, true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
+  @Test
+  public void testCommitPending() throws Exception {
+    MRApp app = new MRApp(1, 0, false);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+        1, job.getTasks().size());
+     Iterator<Task> it = job.getTasks().values().iterator();
+     Task task = it.next();
+     app.waitForState(task, TaskState.RUNNING);
+     TaskAttempt attempt = task.getAttempts().values().iterator().next();
+     app.waitForState(attempt, TaskAttemptState.RUNNING);
+     
+     //send the commit pending signal to the task
+     app.getContext().getEventHandler().handle(
+         new TaskAttemptEvent(
+             attempt.getID(),
+             TaskAttemptEventType.TA_COMMIT_PENDING));
+     
+     //wait for first attempt to commit pending
+     app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
+     
+     //send the done signal to the task
+     app.getContext().getEventHandler().handle(
+         new TaskAttemptEvent(
+             task.getAttempts().values().iterator().next().getID(),
+             TaskAttemptEventType.TA_DONE));
+     
+     app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testCompletedMapsForReduceSlowstart() throws Exception {
+    MRApp app = new MRApp(2, 1, false);
+    Configuration conf = new Configuration();
+    //after half of the map completion, reduce will start
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.NEW, reduceTask.getReport().state);
+    
+    //send the done signal to the 1st map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask1.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    //Once the first map completes, it will schedule the reduces
+    //now reduce must be running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    
+    //send the done signal to 2nd map and the reduce to complete the job
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testJobError() throws Exception {
+    MRApp app = new MRApp(1, 0, false);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+        1, job.getTasks().size());
+     Iterator<Task> it = job.getTasks().values().iterator();
+     Task task = it.next();
+     app.waitForState(task, TaskState.RUNNING);
+     
+     //send an invalid event on task at current state
+     app.getContext().getEventHandler().handle(
+         new TaskEvent(
+             task.getID(), TaskEventType.T_SCHEDULE));
+     
+     //this must lead to job error
+     app.waitForState(job, JobState.ERROR);
+  }
+
+  @Test
+  public void checkJobStateTypeConversion() {
+    //verify that all states can be converted without 
+    // throwing an exception
+    for (JobState state : JobState.values()) {
+      TypeConverter.fromYarn(state);
+    }
+  }
+
+  @Test
+  public void checkTaskStateTypeConversion() {
+  //verify that all states can be converted without 
+    // throwing an exception
+    for (TaskState state : TaskState.values()) {
+      TypeConverter.fromYarn(state);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestMRApp t = new TestMRApp();
+    t.testMapReduce();
+    t.testCommitPending();
+    t.testCompletedMapsForReduceSlowstart();
+    t.testJobError();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.junit.Test;
+
+public class TestMRClientService {
+
+  @Test
+  public void test() throws Exception {
+    MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+    // send the diagnostic
+    String diagnostic1 = "Diagnostic1";
+    String diagnostic2 = "Diagnostic2";
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(attempt.getID(), diagnostic1));
+
+    // send the status update
+    TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+    taskAttemptStatus.id = attempt.getID();
+    taskAttemptStatus.progress = 0.5f;
+    taskAttemptStatus.diagnosticInfo = diagnostic2;
+    taskAttemptStatus.stateString = "RUNNING";
+    taskAttemptStatus.phase = Phase.MAP;
+    taskAttemptStatus.outputSize = 3;
+    // send the status update
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));
+
+    
+    //verify that all object are fully populated by invoking RPCs.
+    YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol proxy =
+      (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+          app.clientService.getBindAddress(), conf);
+    Assert.assertNotNull("Counters is null", proxy.getCounters(job.getID()));
+    Assert.assertNotNull("JobReport is null", proxy.getJobReport(job.getID()));
+    Assert.assertNotNull("TaskCompletionEvents is null", 
+        proxy.getTaskAttemptCompletionEvents(job.getID(), 0, 10));
+    Assert.assertNotNull("Diagnostics is null", 
+        proxy.getDiagnostics(attempt.getID()));
+    Assert.assertNotNull("TaskAttemptReport is null", 
+        proxy.getTaskAttemptReport(attempt.getID()));
+    Assert.assertNotNull("TaskReport is null", 
+        proxy.getTaskReport(task.getID()));
+    
+    Assert.assertNotNull("TaskReports for map is null", 
+        proxy.getTaskReports(job.getID(), 
+        TaskType.MAP));
+    Assert.assertNotNull("TaskReports for reduce is null", 
+        proxy.getTaskReports(job.getID(), 
+        TaskType.REDUCE));
+    
+    List<CharSequence> diag = proxy.getDiagnostics(attempt.getID());
+    Assert.assertEquals("No of diagnostic not correct" , 2 , diag.size());
+    Assert.assertEquals("Diag 1 not correct" , 
+        diagnostic1, diag.get(0).toString());
+    Assert.assertEquals("Diag 2 not correct" , 
+        diagnostic2, diag.get(1).toString());
+    
+    TaskReport taskReport = proxy.getTaskReport(task.getID());
+    Assert.assertEquals("No of diagnostic not correct", 2, 
+        taskReport.diagnostics.size());
+    
+    //send the done signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+  }
+
+  class MRAppWithClientService extends MRApp {
+    MRClientService clientService = null;
+    MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
+      super(maps, reduces, autoComplete);
+    }
+    @Override
+    protected ClientService createClientService(AppContext context) {
+      clientService = new MRClientService(context);
+      return clientService;
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    TestMRClientService t = new TestMRClientService();
+    t.test();
+  }
+}