You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [31/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,895 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventFailRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventCompleted;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventContainerAllocated;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeState;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * Mock MRAppMaster. Doesn't start RPC servers.
+ * No threads are started except of the event Dispatcher thread.
+ */
+@SuppressWarnings("unchecked")
+public class MRApp extends MRAppMaster {
+  private static final Log LOG = LogFactory.getLog(MRApp.class);
+
+  int maps;
+  int reduces;
+
+  private File testWorkDir;
+  private Path testAbsPath;
+  private ClusterInfo clusterInfo;
+  private volatile boolean exited = false;
+
+  // TODO Default values. These will no longer be used if a test decides to mark a 
+  // node as bad, in which case it should use the getters.
+  // Leaving these as public to prevent changes in multiple places. 
+  public static String NM_HOST = "localhost";
+  public static int NM_PORT = 1234;
+  public static int NM_HTTP_PORT = 8042;
+  
+  private int currentNmPort;
+
+  private static final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  //if true, tasks complete automatically as soon as they are launched
+  protected boolean autoComplete = false;
+
+  static ApplicationId applicationId;
+
+  // TODO: Look at getting rid of this. Each test should generate it's own id, 
+  // or have it provided.. Using a custom id without updating this causes problems.
+  static {
+    applicationId = recordFactory.newRecordInstance(ApplicationId.class);
+    applicationId.setClusterTimestamp(0);
+    applicationId.setId(0);
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, Clock clock) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
+  }
+  
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
+        new SystemClock());
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount, Clock clock) {
+    this(getApplicationAttemptId(applicationId, startCount), getContainerId(
+      applicationId, startCount), maps, reduces, autoComplete, testName,
+      cleanOnStart, startCount, clock);
+  }
+
+  public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+      int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount) {
+    this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
+        cleanOnStart, startCount, new SystemClock());
+  }
+
+  public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+      int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount, Clock clock) {
+    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
+        .currentTimeMillis());
+    this.currentNmPort = NM_PORT;
+    this.testWorkDir = new File("target", testName);
+    testAbsPath = new Path(testWorkDir.getAbsolutePath());
+    LOG.info("PathUsed: " + testAbsPath);
+    if (cleanOnStart) {
+      testAbsPath = new Path(testWorkDir.getAbsolutePath());
+      try {
+        FileContext.getLocalFSFileContext().delete(testAbsPath, true);
+      } catch (Exception e) {
+        LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
+        throw new YarnException("could not cleanup test dir", e);
+      }
+    }
+
+    applicationId = appAttemptId.getApplicationId();
+    this.maps = maps;
+    this.reduces = reduces;
+    this.autoComplete = autoComplete;
+  }
+
+  
+  
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    if (this.clusterInfo != null) {
+      getContext().getClusterInfo().setMinContainerCapability(
+          this.clusterInfo.getMinContainerCapability());
+      getContext().getClusterInfo().setMaxContainerCapability(
+          this.clusterInfo.getMaxContainerCapability());
+    } else {
+      getContext().getClusterInfo().setMinContainerCapability(
+          BuilderUtils.newResource(1024, 1));
+      getContext().getClusterInfo().setMaxContainerCapability(
+          BuilderUtils.newResource(10240, 1));
+    }
+    // TODO Any point doing this here. Otherwise move to an overridden createDispatcher()
+//    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+  }
+
+  public Job submit(Configuration conf) throws Exception {
+    String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
+      .getCurrentUser().getShortUserName());
+    conf.set(MRJobConfig.USER_NAME, user);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
+    conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
+    //TODO: fix the bug where the speculator gets events with 
+    //not-fully-constructed objects. For now, disable speculative exec
+    LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+    init(conf);
+    start();
+    DefaultMetricsSystem.shutdown();
+    Job job = getContext().getAllJobs().values().iterator().next();
+
+    // Write job.xml
+    String jobFile = MRApps.getJobFile(conf, user,
+      TypeConverter.fromYarn(job.getID()));
+    LOG.info("Writing job conf to " + jobFile);
+    new File(jobFile).getParentFile().mkdirs();
+    conf.writeXml(new FileOutputStream(jobFile));
+
+    return job;
+  }
+
+  public void waitForInternalState(TaskAttemptImpl attempt,
+      TaskAttemptStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    TaskAttemptStateInternal iState = attempt.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = attempt.getReport();
+      iState = attempt.getInternalState();
+    }   
+    System.out.println("TaskAttempt Internal State is : " + iState);
+    Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
+  public void waitForState(TaskAttempt attempt, 
+      TaskAttemptState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    while (!finalState.equals(report.getTaskAttemptState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt State for " + attempt.getID() + " is : " + 
+          report.getTaskAttemptState() +
+          " Waiting for state : " + finalState +
+          "   progress : " + report.getProgress());
+      report = attempt.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
+    Assert.assertEquals("TaskAttempt state is not correct (timedout)",
+        finalState, 
+        report.getTaskAttemptState());
+  }
+
+  public void waitForState(Task task, TaskState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskReport report = task.getReport();
+    while (!finalState.equals(report.getTaskState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Task State for " + task.getID() + " is : "
+          + report.getTaskState() + " Waiting for state : " + finalState
+          + "   progress : " + report.getProgress());
+      report = task.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Task State is : " + report.getTaskState());
+    Assert.assertEquals("Task state is not correct (timedout)", finalState, 
+        report.getTaskState());
+  }
+
+  public void waitForAMExit() throws Exception {
+    int timeoutSecs = 0;
+    while (!exited && timeoutSecs ++ < 20) {
+      System.out.println("Waiting for AM exit");
+      Thread.sleep(500);
+    }
+    System.out.print("AM Exit State is: " + exited);
+    Assert.assertEquals("AM did not exit (timedout)", true, exited);
+  }
+  
+  public void waitForState(Job job, JobState finalState) throws Exception {
+    int timeoutSecs = 0;
+    JobReport report = job.getReport();
+    while (!finalState.equals(report.getJobState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Job State is : " + report.getJobState() +
+          " Waiting for state : " + finalState +
+          "   map progress : " + report.getMapProgress() + 
+          "   reduce progress : " + report.getReduceProgress());
+      report = job.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Job State is : " + report.getJobState());
+    Assert.assertEquals("Job state is not correct (timedout)", finalState, 
+        job.getState());
+  }
+  
+  // Gets the current nodeId being used in simulations. A node will be forgotten
+  // if it turns unhealthy.
+  public String getCurrentNmHost() {
+    return NM_HOST;
+  }
+  
+  public int getCurrentNmPort() {
+    return this.currentNmPort;
+  }
+  
+  public int getCurrentNmHttpPort() {
+    return NM_HTTP_PORT;
+  }
+
+  public void waitForState(Service.STATE finalState) throws Exception {
+    int timeoutSecs = 0;
+    while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
+      System.out.println("MRApp State is : " + getServiceState()
+          + " Waiting for state : " + finalState);
+      Thread.sleep(500);
+    }
+    System.out.println("MRApp State is : " + getServiceState());
+    Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
+        getServiceState());
+  }
+
+  public void verifyCompleted() {
+    for (Job job : getContext().getAllJobs().values()) {
+      JobReport jobReport = job.getReport();
+      System.out.println("Job start time :" + jobReport.getStartTime());
+      System.out.println("Job finish time :" + jobReport.getFinishTime());
+      Assert.assertTrue("Job start time is not less than finish time",
+          jobReport.getStartTime() <= jobReport.getFinishTime());
+      Assert.assertTrue("Job finish time is in future",
+          jobReport.getFinishTime() <= System.currentTimeMillis());
+      for (Task task : job.getTasks().values()) {
+        TaskReport taskReport = task.getReport();
+        System.out.println("Task start time : " + taskReport.getStartTime());
+        System.out.println("Task finish time : " + taskReport.getFinishTime());
+        Assert.assertTrue("Task start time is not less than finish time",
+            taskReport.getStartTime() <= taskReport.getFinishTime());
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          TaskAttemptReport attemptReport = attempt.getReport();
+          Assert.assertTrue("Attempt start time is not less than finish time",
+              attemptReport.getStartTime() <= attemptReport.getFinishTime());
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+  }
+
+  private static ApplicationAttemptId getApplicationAttemptId(
+      ApplicationId applicationId, int startCount) {
+    ApplicationAttemptId applicationAttemptId =
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    applicationAttemptId.setApplicationId(applicationId);
+    applicationAttemptId.setAttemptId(startCount);
+    return applicationAttemptId;
+  }
+  
+  private static ContainerId getContainerId(ApplicationId applicationId,
+      int startCount) {
+    ApplicationAttemptId appAttemptId =
+        getApplicationAttemptId(applicationId, startCount);
+    ContainerId containerId =
+        BuilderUtils.newContainerId(appAttemptId, startCount);
+    return containerId;
+  }
+  
+  @Override
+  protected Job createJob(Configuration conf) {
+    UserGroupInformation currentUser = null;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
+    		getDispatcher().getEventHandler(),
+            getTaskAttemptListener(), getContext().getClock(), getCommitter(),
+            isNewApiCommitter(), currentUser.getUserName(),
+            getTaskHeartbeatHandler(), getContext());
+    ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+    getDispatcher().register(JobFinishEvent.Type.class, new MRAppJobFinishHandler());
+
+    return newJob;
+  }
+  
+  protected class MRAppJobFinishHandler extends JobFinishEventHandlerCR {
+    
+    @Override
+    protected void exit() {
+      exited = true;
+    }
+    
+    @Override
+    protected void maybeSendJobEndNotification() {
+    }
+  }
+
+  @Override
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+    return new TaskAttemptListener(){
+
+      @Override
+      public InetSocketAddress getAddress() {
+        return NetUtils.createSocketAddr("localhost:54321");
+      }
+      
+            @Override
+      public void unregisterTaskAttempt(TaskAttemptId attemptID) {
+        // TODO Auto-generated method stub
+        
+      }
+
+      
+
+      @Override
+      public void registerRunningContainer(ContainerId containerId) {
+        // TODO Auto-generated method stub
+        
+      }
+
+      @Override
+      public void registerTaskAttempt(TaskAttemptId attemptId,
+          ContainerId containerId) {
+        // TODO Auto-generated method stub
+        
+      }
+
+      @Override
+      public void unregisterRunningContainer(ContainerId containerId) {
+        // TODO Auto-generated method stub
+        
+      }
+    };
+  }
+  
+  @Override
+  protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+      Configuration conf) {
+    return new TaskHeartbeatHandler(context, maps) {
+
+      @Override
+      public void init(Configuration conf) {
+      }
+
+      @Override
+      public void start() {
+      }
+
+      @Override
+      public void stop() {
+      }
+    };
+  }
+
+  @Override
+  protected ContainerHeartbeatHandler createContainerHeartbeatHandler(
+      AppContext context, Configuration conf) {
+    return new ContainerHeartbeatHandler(context, 1) {
+      @Override
+      public void init(Configuration conf) {
+      }
+
+      @Override
+      public void start() {
+      }
+
+      @Override
+      public void stop() {
+      }
+    };
+  }
+
+  @Override
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      AppContext context) {//disable history
+    return new EventHandler<JobHistoryEvent>() {
+      @Override
+      public void handle(JobHistoryEvent event) {
+      }
+    };
+  }
+  
+  
+  
+  @Override
+  protected ContainerLauncher createContainerLauncher(AppContext context) {
+    return new MockContainerLauncher();
+  }
+
+  // appAcls and attemptToContainerIdMap shared between various mocks.
+  protected Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+  protected Map<TaskAttemptId, ContainerId> attemptToContainerIdMap = new HashMap<TaskAttemptId, ContainerId>();
+  
+  protected class MockContainerLauncher implements ContainerLauncher {
+
+    //We are running locally so set the shuffle port to -1 
+    int shufflePort = -1;
+
+    public MockContainerLauncher() {
+    }
+
+    @Override
+    public void handle(NMCommunicatorEvent event) {
+      switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        LOG.info("DEBUG: Handling CONTAINER_LAUNCH_REQUEST for: " + event.getContainerId());
+        
+        AMContainer amContainer = getContext().getAllContainers().get(event.getContainerId());
+        TaskAttemptId attemptIdForContainer = amContainer.getQueuedTaskAttempts().iterator().next();
+        // Container Launched.
+        getContext().getEventHandler().handle(
+            new AMContainerEventLaunched(event.getContainerId(), shufflePort));
+        
+        // Simulate a TaskPull from the remote task.
+        getContext().getEventHandler().handle(
+            new AMContainerEvent(event.getContainerId(),
+                AMContainerEventType.C_PULL_TA));
+         
+        // Simulate a TaskAttemptStartedEvent to the TaskAtetmpt.
+        // Maybe simulate a completed task.
+        getContext().getEventHandler().handle(
+            new TaskAttemptEventStartedRemotely(attemptIdForContainer, event.getContainerId(), appAcls,
+                shufflePort));
+        attemptLaunched(attemptIdForContainer);
+
+        break;
+      case CONTAINER_STOP_REQUEST:
+        ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+        cs.setContainerId(event.getContainerId());
+        getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
+        break;
+      }
+    }
+  }
+
+  protected void attemptLaunched(TaskAttemptId attemptId) {
+    if (autoComplete) {
+      // send the done event
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
+    }
+  }
+
+  @Override
+  protected ContainerRequestor createContainerRequestor(
+      ClientService clientService, AppContext appContext) {
+    return new MRAppContainerRequestor(clientService, appContext);
+  }
+  
+  protected class MRAppContainerRequestor extends RMContainerRequestor {
+
+    int numReleaseRequests;
+    
+    public MRAppContainerRequestor(ClientService clientService,
+        AppContext context) {
+      super(clientService, context);
+    }
+    
+    @Override public void init(Configuration conf) {}
+    @Override public void start() {}
+    @Override public void stop() {}
+    //TODO TODO: getApplicationAcls, getJob
+    
+    @Override public void addContainerReq(ContainerRequest req) {}
+    @Override public void decContainerReq(ContainerRequest req) {}
+
+    public void handle(RMCommunicatorEvent rawEvent) {
+      LOG.info("DEBUG: MRAppContainerRequestor handling event of type:" + rawEvent.getType() + ", event: " + rawEvent + ", for containerId: ");
+      switch (rawEvent.getType()) {
+      case CONTAINER_DEALLOCATE:
+        numReleaseRequests++;
+        ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+        cs.setContainerId(((RMCommunicatorContainerDeAllocateRequestEvent)rawEvent).getContainerId());
+        getContext().getEventHandler().handle(new AMContainerEventCompleted(cs));
+        LOG.info("DEBUG: Sending out C_COMPLETE for containerId: " + cs.getContainerId());
+        break;
+      default:
+        LOG.warn("Invalid event of type: " + rawEvent.getType() + ", Event: "
+            + rawEvent);
+        break;
+      }
+    }
+
+    public int getNumReleaseRequests() {
+      return numReleaseRequests;
+    }
+  }
+ 
+  @Override
+  protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+      AppContext appContext) {
+    return new MRAppAMScheduler();
+  }
+
+  protected class MRAppAMScheduler extends AbstractService implements
+      ContainerAllocator {
+    private int containerCount;
+    
+    MRAppAMScheduler() {
+      super(MRAppAMScheduler.class.getSimpleName());
+    }
+    
+    public void start() {}
+    public void init(Configuration conf) {}
+    public void stop() {}
+    
+    private NodeId getUsableNode() {
+      NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, currentNmPort);
+      getContext().getAllNodes().nodeSeen(nodeId);
+      if (getContext().getAllNodes().get(nodeId).getState() != AMNodeState.ACTIVE) {
+        LOG.info("Current node is not ACTIVE. Creating a new one");
+        currentNmPort++;
+        nodeId = BuilderUtils.newNodeId(NM_HOST, currentNmPort);
+        getContext().getAllNodes().nodeSeen(nodeId);
+        if (getContext().getAllNodes().get(nodeId).getState() != AMNodeState.ACTIVE) {
+          throw new YarnException("Failed to find a usable nodeId");
+        }
+        LOG.info("Created new nodeId: " + nodeId);
+      }
+      return nodeId;
+    }
+
+    @Override
+    public void handle(AMSchedulerEvent rawEvent) {
+      LOG.info("DEBUG: MRAppAMScheduler handling event of type:" + rawEvent.getType() + ", event: " + rawEvent);
+      switch (rawEvent.getType()) {
+      case S_TA_LAUNCH_REQUEST:
+        AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)rawEvent;
+        
+        // Wait for the node to be healthy before simulating a container allocation.
+        NodeId nodeId = getUsableNode();
+        
+        ContainerId cId = Records.newRecord(ContainerId.class);
+        cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+        cId.setId(containerCount++);
+        
+        Container container = BuilderUtils.newContainer(cId, nodeId,
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+        getContext().getAllContainers().addContainerIfNew(container);
+        
+          getContext().getEventHandler().handle(
+              new AMNodeEventContainerAllocated(container
+                  .getNodeId(), container.getId()));
+        
+        JobID id = TypeConverter.fromYarn(applicationId);
+        JobId jobId = TypeConverter.toYarn(id);
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(TaskType.REDUCE, 100)));
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(TaskType.MAP, 100)));
+        
+        attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+        if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+          LOG.info("DEBUG: Sending launch request for container: " + cId
+              + " for taskAttemptId: " + lEvent.getAttemptID());
+          AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+              cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+              lEvent.getJobToken(), lEvent.getCredentials(), false,
+              new JobConf(getContext().getJob(jobId).getConf()));
+          getContext().getEventHandler().handle(lrEvent);
+        }
+        LOG.info("DEBUG: Assigning attempt [" + lEvent.getAttemptID()
+            + "] to Container [" + cId + "]");
+        getContext().getEventHandler().handle(
+            new AMContainerEventAssignTA(cId, lEvent.getAttemptID(), lEvent
+                .getRemoteTaskContext()));
+
+        break;
+      case S_TA_ENDED:
+        // Send out a Container_stop_request.
+        AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) rawEvent;
+        LOG.info("DEBUG: Handling S_TA_ENDED for attemptId:"
+            + sEvent.getAttemptID() + " with state: " + sEvent.getState());
+        switch (sEvent.getState()) {
+        case FAILED:
+        case KILLED:
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        case SUCCEEDED:
+          // No re-use in MRApp. Stop the container.
+          getContext().getEventHandler().handle(
+              new AMContainerEvent(attemptToContainerIdMap.get(sEvent
+                  .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+          break;
+        default:
+          throw new YarnException("Unexpected state: " + sEvent.getState());
+        }
+      case S_CONTAINERS_ALLOCATED:
+        break;
+      case S_CONTAINER_COMPLETED:
+        break;
+      default:
+          break;
+      }
+    }
+  }
+
+  @Override
+  protected TaskCleaner createTaskCleaner(AppContext context) {
+    return new TaskCleaner() {
+      @Override
+      public void handle(TaskCleanupEvent event) {
+      }
+    };
+  }
+
+  @Override
+  protected ClientService createClientService(AppContext context) {
+    return new ClientService(){
+      @Override
+      public InetSocketAddress getBindAddress() {
+        return NetUtils.createSocketAddr("localhost:9876");
+      }
+
+      @Override
+      public int getHttpPort() {
+        return -1;
+      }
+    };
+  }
+
+  public void setClusterInfo(ClusterInfo clusterInfo) {
+    // Only useful if set before a job is started.
+    if (getServiceState() == Service.STATE.NOTINITED
+        || getServiceState() == Service.STATE.INITED) {
+      this.clusterInfo = clusterInfo;
+    } else {
+      throw new IllegalStateException(
+          "ClusterInfo can only be set before the App is STARTED");
+    }
+  }
+
+  class TestJob extends JobImpl {
+    //override the init transition
+    private final TestInitTransition initTransition = new TestInitTransition(
+        maps, reduces);
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
+        localStateMachine;
+
+    @Override
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
+      return localStateMachine;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener,  Clock clock,
+        OutputCommitter committer, boolean newApiCommitter, String user,
+        TaskHeartbeatHandler thh, AppContext appContext) {
+      super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
+          conf, eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials(), clock,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
+          newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
+          thh, appContext);
+
+      // This "this leak" is okay because the retained pointer is in an
+      //  instance variable.
+      localStateMachine = localFactory.make(this);
+    }
+  }
+
+  //Override InitTransition to not look for split files etc
+  static class TestInitTransition extends JobImpl.InitTransition {
+    private int maps;
+    private int reduces;
+    TestInitTransition(int maps, int reduces) {
+      this.maps = maps;
+      this.reduces = reduces;
+    }
+    @Override
+    protected void setup(JobImpl job) throws IOException {
+      super.setup(job);
+      job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
+      job.remoteJobConfFile = new Path("test");
+    }
+    @Override
+    protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
+      for (int i = 0; i < maps ; i++) {
+        splits[i] = new TaskSplitMetaInfo();
+      }
+      return splits;
+    }
+  }
+  
+
+  private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState) {
+    TaskAttemptStatus tas = new TaskAttemptStatus();
+    tas.id = taskAttemptId;
+    tas.progress = 1.0f;
+    tas.phase = Phase.CLEANUP;
+    tas.stateString = finalState.name();
+    tas.taskState = finalState;
+    Counters counters = new Counters();
+    tas.counters = counters;
+    return tas;
+  }
+
+  private void sendStatusUpdate(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState) {
+    TaskAttemptStatus tas = createTaskAttemptStatus(taskAttemptId, finalState);
+    getContext().getEventHandler().handle(
+        new TaskAttemptEventStatusUpdate(taskAttemptId, tas));
+  }
+
+  /*
+   * Helper method to move a task attempt into a final state.
+   */
+  // TODO maybe rename to something like succeedTaskAttempt
+  public void sendFinishToTaskAttempt(TaskAttemptId taskAttemptId,
+      TaskAttemptState finalState, boolean sendStatusUpdate) throws Exception {
+    if (sendStatusUpdate) {
+      sendStatusUpdate(taskAttemptId, finalState);
+    }
+    if (finalState == TaskAttemptState.SUCCEEDED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptId,
+              TaskAttemptEventType.TA_DONE));
+    } else if (finalState == TaskAttemptState.KILLED) {
+      getContext().getEventHandler()
+          .handle(new TaskAttemptEventKillRequest(taskAttemptId,
+                  "Kill requested"));
+    } else if (finalState == TaskAttemptState.FAILED) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEventFailRequest(taskAttemptId, null));
+    }
+  }
+}
+ 

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,336 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventTAEnded;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventAssignTA;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunchRequest;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+public class MRAppBenchmark {
+
+  /**
+   * Runs memory and time benchmark with Mock MRApp.
+   */
+  public void run(MRApp app) throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.WARN);
+    long startTime = System.currentTimeMillis();
+    Job job = app.submit(new Configuration());
+    while (!job.getReport().getJobState().equals(JobState.SUCCEEDED)) {
+      printStat(job, startTime);
+      Thread.sleep(2000);
+    }
+    printStat(job, startTime);
+  }
+
+  private void printStat(Job job, long startTime) throws Exception {
+    long currentTime = System.currentTimeMillis();
+    Runtime.getRuntime().gc();
+    long mem = Runtime.getRuntime().totalMemory() 
+      - Runtime.getRuntime().freeMemory();
+    System.out.println("JobState:" + job.getState() +
+        " CompletedMaps:" + job.getCompletedMaps() +
+        " CompletedReduces:" + job.getCompletedReduces() +
+        " Memory(total-free)(KB):" + mem/1024 +
+        " ElapsedTime(ms):" + (currentTime - startTime));
+  }
+
+  //Throttles the maximum number of concurrent running tasks.
+  //This affects the memory requirement since 
+  //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+  //running task and discarded once the task is launched.
+  static class ThrottledMRApp extends MRApp {
+
+    int maxConcurrentRunningTasks;
+    volatile int concurrentRunningTasks;
+    ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+      super(maps, reduces, true, "ThrottledMRApp", true);
+      this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+    }
+    
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      super.attemptLaunched(attemptID);
+      //the task is launched and sends done immediately
+      concurrentRunningTasks--;
+    }
+    
+    @Override
+    protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+        AppContext appContext) {
+      return new ThrottledContainerAllocator();
+    }
+    
+    class ThrottledContainerAllocator extends AbstractService 
+        implements ContainerAllocator {
+      private int containerCount;
+      private Thread thread;
+      private BlockingQueue<AMSchedulerEvent> eventQueue =
+        new LinkedBlockingQueue<AMSchedulerEvent>();
+      public ThrottledContainerAllocator() {
+        super("ThrottledContainerAllocator");
+      }
+      @Override
+      public void handle(AMSchedulerEvent event) {
+        try {
+          eventQueue.put(event);
+        } catch (InterruptedException e) {
+          throw new YarnException(e);
+        }
+      }
+      @Override
+      public void start() {
+        thread = new Thread(new Runnable() {
+          @SuppressWarnings("unchecked")
+          @Override
+          public void run() {
+            AMSchedulerEvent event = null;
+            while (!Thread.currentThread().isInterrupted()) {
+              try {
+                if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+                  event = eventQueue.take();
+                  switch(event.getType()) {
+                  case S_TA_LAUNCH_REQUEST:
+                    AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)event;
+                    ContainerId cId = Records.newRecord(ContainerId.class);
+                    cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+                    cId.setId(containerCount++);
+                    NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
+                    Container container = BuilderUtils.newContainer(cId, nodeId,
+                        NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+                    
+                    getContext().getAllContainers().addContainerIfNew(container);
+                    getContext().getAllNodes().nodeSeen(nodeId);
+                    
+                    JobID id = TypeConverter.fromYarn(getContext().getApplicationID());
+                    JobId jobId = TypeConverter.toYarn(id);
+                    
+                    attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+                    if (getContext().getAllContainers().get(cId).getState() == AMContainerState.ALLOCATED) {
+                    
+                      AMContainerEventLaunchRequest lrEvent = new AMContainerEventLaunchRequest(
+                          cId, jobId, lEvent.getAttemptID().getTaskId().getTaskType(),
+                          lEvent.getJobToken(), lEvent.getCredentials(), false,
+                          new JobConf(getContext().getJob(jobId).getConf()));
+                      getContext().getEventHandler().handle(lrEvent);
+                    }
+                    
+                    getContext().getEventHandler().handle(
+                        new AMContainerEventAssignTA(cId, lEvent.getAttemptID(), lEvent
+                            .getRemoteTaskContext()));
+                    concurrentRunningTasks++;
+                    break;
+                    
+                  case S_TA_ENDED:
+                    // Send out a Container_stop_request.
+                    AMSchedulerEventTAEnded sEvent = (AMSchedulerEventTAEnded) event;
+                    switch (sEvent.getState()) {
+                    case FAILED:
+                    case KILLED:
+                      getContext().getEventHandler().handle(
+                          new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+                              .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+                      break;
+                    case SUCCEEDED:
+                      // No re-use in MRApp. Stop the container.
+                      getContext().getEventHandler().handle(
+                          new AMContainerEvent(attemptToContainerIdMap.remove(sEvent
+                              .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+                      break;
+                    default:
+                      throw new YarnException("Unexpected state: " + sEvent.getState());
+                    }
+                  case S_CONTAINERS_ALLOCATED:
+                    break;
+                  case S_CONTAINER_COMPLETED:
+                    break;
+                  default:
+                      break;
+                  }
+                } else {
+                  Thread.sleep(1000);
+                }
+              } catch (InterruptedException e) {
+                System.out.println("Returning, interrupted");
+                return;
+              }
+            }
+          }
+        });
+        thread.start();
+        super.start();
+      }
+
+      @Override
+      public void stop() {
+        thread.interrupt();
+        super.stop();
+      }
+    }
+  }
+
+  @Test
+  public void benchmark1() throws Exception {
+    int maps = 100; // Adjust for benchmarking. Start with thousands.
+    int reduces = 0;
+    System.out.println("Running benchmark with maps:"+maps +
+        " reduces:"+reduces);
+    run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+
+      @Override
+      protected ContainerAllocator createAMScheduler(
+          ContainerRequestor requestor, AppContext appContext) {
+        return new RMContainerAllocator((RMContainerRequestor) requestor,
+            appContext);
+      }
+
+      @Override
+      protected ContainerRequestor createContainerRequestor(
+          ClientService clientService, AppContext appContext) {
+        return new RMContainerRequestor(clientService, appContext) {
+          @Override
+          protected AMRMProtocol createSchedulerProxy() {
+            return new AMRMProtocol() {
+
+              @Override
+              public RegisterApplicationMasterResponse
+                  registerApplicationMaster(
+                      RegisterApplicationMasterRequest request)
+                      throws YarnRemoteException {
+                RegisterApplicationMasterResponse response =
+                    Records.newRecord(RegisterApplicationMasterResponse.class);
+                response.setMinimumResourceCapability(BuilderUtils
+                  .newResource(1024, 1));
+                response.setMaximumResourceCapability(BuilderUtils
+                  .newResource(10240, 1));
+                return response;
+              }
+
+              @Override
+              public FinishApplicationMasterResponse finishApplicationMaster(
+                  FinishApplicationMasterRequest request)
+                  throws YarnRemoteException {
+                FinishApplicationMasterResponse response =
+                    Records.newRecord(FinishApplicationMasterResponse.class);
+                return response;
+              }
+
+              @Override
+              public AllocateResponse allocate(AllocateRequest request)
+                  throws YarnRemoteException {
+
+                AllocateResponse response =
+                    Records.newRecord(AllocateResponse.class);
+                List<ResourceRequest> askList = request.getAskList();
+                List<Container> containers = new ArrayList<Container>();
+                for (ResourceRequest req : askList) {
+                  if (req.getHostName() != "*") {
+                    continue;
+                  }
+                  int numContainers = req.getNumContainers();
+                  for (int i = 0; i < numContainers; i++) {
+                    ContainerId containerId =
+                        BuilderUtils.newContainerId(
+                          request.getApplicationAttemptId(),
+                          request.getResponseId() + i);
+                    containers.add(BuilderUtils
+                      .newContainer(containerId, BuilderUtils.newNodeId("host"
+                          + containerId.getId(), 2345),
+                        "host" + containerId.getId() + ":5678", req
+                          .getCapability(), req.getPriority(), null));
+                  }
+                }
+
+                AMResponse amResponse = Records.newRecord(AMResponse.class);
+                amResponse.setAllocatedContainers(containers);
+                amResponse.setResponseId(request.getResponseId() + 1);
+                response.setAMResponse(amResponse);
+                response.setNumClusterNodes(350);
+                return response;
+              }
+            };
+          }
+        };
+      }
+    });
+  }
+
+  @Test
+  public void benchmark2() throws Exception {
+    int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+    int reduces = 50;
+    int maxConcurrentRunningTasks = 500;
+    
+    System.out.println("Running benchmark with throttled running tasks with " +
+        "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+        " maps:" + maps + " reduces:" + reduces);
+    run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+  }
+
+  public static void main(String[] args) throws Exception {
+    MRAppBenchmark benchmark = new MRAppBenchmark();
+    benchmark.benchmark1();
+    benchmark.benchmark2();
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MockJobs extends MockApps {
+  static final Iterator<JobState> JOB_STATES = Iterators.cycle(JobState
+      .values());
+  static final Iterator<TaskState> TASK_STATES = Iterators.cycle(TaskState
+      .values());
+  static final Iterator<TaskAttemptState> TASK_ATTEMPT_STATES = Iterators
+      .cycle(TaskAttemptState.values());
+  static final Iterator<TaskType> TASK_TYPES = Iterators.cycle(TaskType
+      .values());
+  static final Iterator<JobCounter> JOB_COUNTERS = Iterators.cycle(JobCounter
+      .values());
+  static final Iterator<FileSystemCounter> FS_COUNTERS = Iterators
+      .cycle(FileSystemCounter.values());
+  static final Iterator<TaskCounter> TASK_COUNTERS = Iterators
+      .cycle(TaskCounter.values());
+  static final Iterator<String> FS_SCHEMES = Iterators.cycle("FILE", "HDFS",
+      "LAFS", "CEPH");
+  static final Iterator<String> USER_COUNTER_GROUPS = Iterators
+      .cycle(
+          "com.company.project.subproject.component.subcomponent.UserDefinedSpecificSpecialTask$Counters",
+          "PigCounters");
+  static final Iterator<String> USER_COUNTERS = Iterators.cycle("counter1",
+      "counter2", "counter3");
+  static final Iterator<Phase> PHASES = Iterators.cycle(Phase.values());
+  static final Iterator<String> DIAGS = Iterators.cycle(
+      "Error: java.lang.OutOfMemoryError: Java heap space",
+      "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+  public static final String NM_HOST = "localhost";
+  public static final int NM_PORT = 1234;
+  public static final int NM_HTTP_PORT = 8042;
+
+  static final int DT = 1000000; // ms
+
+  public static String newJobName() {
+    return newAppName();
+  }
+
+  /**
+   * Create numJobs in a map with jobs having appId==jobId
+   */
+  public static Map<JobId, Job> newJobs(int numJobs, int numTasksPerJob,
+      int numAttemptsPerTask) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobs; ++j) {
+      ApplicationId appID = MockJobs.newAppID(j);
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+  
+  public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+      int numTasksPerJob, int numAttemptsPerTask) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobsPerApp; ++j) {
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+  
+  public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+      int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobsPerApp; ++j) {
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask, null,
+          hasFailedTasks);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+
+  public static JobId newJobID(ApplicationId appID, int i) {
+    JobId id = Records.newRecord(JobId.class);
+    id.setAppId(appID);
+    id.setId(i);
+    return id;
+  }
+
+  public static JobReport newJobReport(JobId id) {
+    JobReport report = Records.newRecord(JobReport.class);
+    report.setJobId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setMapProgress((float) Math.random());
+    report.setReduceProgress((float) Math.random());
+    report.setJobState(JOB_STATES.next());
+    return report;
+  }
+
+  public static TaskReport newTaskReport(TaskId id) {
+    TaskReport report = Records.newRecord(TaskReport.class);
+    report.setTaskId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setProgress((float) Math.random());
+    report.setCounters(TypeConverter.toYarn(newCounters()));
+    report.setTaskState(TASK_STATES.next());
+    return report;
+  }
+
+  public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
+    TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
+    report.setTaskAttemptId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setPhase(PHASES.next());
+    report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
+    report.setProgress((float) Math.random());
+    report.setCounters(TypeConverter.toYarn(newCounters()));
+    return report;
+  }
+
+  public static Counters newCounters() {
+    Counters hc = new Counters();
+    for (JobCounter c : JobCounter.values()) {
+      hc.findCounter(c).setValue((long) (Math.random() * 1000));
+    }
+    for (TaskCounter c : TaskCounter.values()) {
+      hc.findCounter(c).setValue((long) (Math.random() * 1000));
+    }
+    int nc = FileSystemCounter.values().length * 4;
+    for (int i = 0; i < nc; ++i) {
+      for (FileSystemCounter c : FileSystemCounter.values()) {
+        hc.findCounter(FS_SCHEMES.next(), c).setValue(
+            (long) (Math.random() * DT));
+      }
+    }
+    for (int i = 0; i < 2 * 3; ++i) {
+      hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next())
+          .setValue((long) (Math.random() * 100000));
+    }
+    return hc;
+  }
+
+  public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid,
+      int m) {
+    Map<TaskAttemptId, TaskAttempt> map = Maps.newHashMap();
+    for (int i = 0; i < m; ++i) {
+      TaskAttempt ta = newTaskAttempt(tid, i);
+      map.put(ta.getID(), ta);
+    }
+    return map;
+  }
+
+  public static TaskAttempt newTaskAttempt(TaskId tid, int i) {
+    final TaskAttemptId taid = Records.newRecord(TaskAttemptId.class);
+    taid.setTaskId(tid);
+    taid.setId(i);
+    final TaskAttemptReport report = newTaskAttemptReport(taid);
+    final List<String> diags = Lists.newArrayList();
+    diags.add(DIAGS.next());
+    return new TaskAttempt() {
+      @Override
+      public NodeId getNodeId() throws UnsupportedOperationException{
+        throw new UnsupportedOperationException();
+      }
+      
+      @Override
+      public TaskAttemptId getID() {
+        return taid;
+      }
+
+      @Override
+      public TaskAttemptReport getReport() {
+        return report;
+      }
+
+      @Override
+      public long getLaunchTime() {
+        return 0;
+      }
+
+      @Override
+      public long getFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public int getShufflePort() {
+        return ShuffleHandler.DEFAULT_SHUFFLE_PORT;
+      }
+
+      @Override
+      public Counters getCounters() {
+        if (report != null && report.getCounters() != null) {
+          return new Counters(TypeConverter.fromYarn(report.getCounters()));
+        }
+        return null;
+      }
+
+      @Override
+      public float getProgress() {
+        return report.getProgress();
+      }
+
+      @Override
+      public TaskAttemptState getState() {
+        return report.getTaskAttemptState();
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.getTaskAttemptState()) {
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public ContainerId getAssignedContainerID() {
+        ContainerId id = Records.newRecord(ContainerId.class);
+        ApplicationAttemptId appAttemptId = Records
+            .newRecord(ApplicationAttemptId.class);
+        appAttemptId.setApplicationId(taid.getTaskId().getJobId().getAppId());
+        appAttemptId.setAttemptId(0);
+        id.setApplicationAttemptId(appAttemptId);
+        return id;
+      }
+
+      @Override
+      public String getNodeHttpAddress() {
+        return "localhost:8042";
+      }
+
+      @Override
+      public List<String> getDiagnostics() {
+        return diags;
+      }
+
+      @Override
+      public String getAssignedContainerMgrAddress() {
+        return "localhost:9998";
+      }
+
+      @Override
+      public long getShuffleFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public long getSortFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public String getNodeRackName() {
+        return "/default-rack";
+      }
+    };
+  }
+
+  public static Map<TaskId, Task> newTasks(JobId jid, int n, int m, boolean hasFailedTasks) {
+    Map<TaskId, Task> map = Maps.newHashMap();
+    for (int i = 0; i < n; ++i) {
+      Task task = newTask(jid, i, m, hasFailedTasks);
+      map.put(task.getID(), task);
+    }
+    return map;
+  }
+
+  public static Task newTask(JobId jid, int i, int m, final boolean hasFailedTasks) {
+    final TaskId tid = Records.newRecord(TaskId.class);
+    tid.setJobId(jid);
+    tid.setId(i);
+    tid.setTaskType(TASK_TYPES.next());
+    final TaskReport report = newTaskReport(tid);
+    final Map<TaskAttemptId, TaskAttempt> attempts = newTaskAttempts(tid, m);
+    return new Task() {
+      @Override
+      public TaskId getID() {
+        return tid;
+      }
+
+      @Override
+      public TaskReport getReport() {
+        return report;
+      }
+
+      @Override
+      public Counters getCounters() {
+        if (hasFailedTasks) {
+          return null;
+        }
+        return new Counters(
+          TypeConverter.fromYarn(report.getCounters()));
+      }
+
+      @Override
+      public float getProgress() {
+        return report.getProgress();
+      }
+
+      @Override
+      public TaskType getType() {
+        return tid.getTaskType();
+      }
+
+      @Override
+      public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+        return attempts;
+      }
+
+      @Override
+      public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+        return attempts.get(attemptID);
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.getTaskState()) {
+        case SUCCEEDED:
+        case KILLED:
+        case FAILED:
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public boolean canCommit(TaskAttemptId taskAttemptID) {
+        return false;
+      }
+
+      @Override
+      public TaskState getState() {
+        return report.getTaskState();
+      }
+
+      @Override
+      public boolean needsWaitAfterOutputConsumable() {
+        return false;
+      }
+
+      @Override
+      public TaskAttemptId getOutputConsumableAttempt() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+    };
+  }
+
+  public static Counters getCounters(
+      Collection<Task> tasks) {
+    List<Task> completedTasks = new ArrayList<Task>();
+    for (Task task : tasks) {
+      if (task.getCounters() != null) {
+        completedTasks.add(task);
+      }
+    }
+    Counters counters = new Counters();
+    return JobImpl.incrTaskCounters(counters, completedTasks);
+  }
+
+  static class TaskCount {
+    int maps;
+    int reduces;
+    int completedMaps;
+    int completedReduces;
+
+    void incr(Task task) {
+      TaskType type = task.getType();
+      boolean finished = task.isFinished();
+      if (type == TaskType.MAP) {
+        if (finished) {
+          ++completedMaps;
+        }
+        ++maps;
+      } else if (type == TaskType.REDUCE) {
+        if (finished) {
+          ++completedReduces;
+        }
+        ++reduces;
+      }
+    }
+  }
+
+  static TaskCount getTaskCount(Collection<Task> tasks) {
+    TaskCount tc = new TaskCount();
+    for (Task task : tasks) {
+      tc.incr(task);
+    }
+    return tc;
+  }
+
+  public static Job newJob(ApplicationId appID, int i, int n, int m) {
+    return newJob(appID, i, n, m, null);
+  }
+
+  public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
+    return newJob(appID, i, n, m, confFile, false);
+  }
+  
+  public static Job newJob(ApplicationId appID, int i, int n, int m,
+      Path confFile, boolean hasFailedTasks) {
+    final JobId id = newJobID(appID, i);
+    final String name = newJobName();
+    final JobReport report = newJobReport(id);
+    final Map<TaskId, Task> tasks = newTasks(id, n, m, hasFailedTasks);
+    final TaskCount taskCount = getTaskCount(tasks.values());
+    final Counters counters = getCounters(tasks
+      .values());
+    final Path configFile = confFile;
+
+    Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
+    final Configuration conf = new Configuration();
+    conf.set(JobACL.VIEW_JOB.getAclName(), "testuser");
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+
+    JobACLsManager aclsManager = new JobACLsManager(conf);
+    tmpJobACLs = aclsManager.constructJobACLs(conf);
+    final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
+    return new Job() {
+      @Override
+      public JobId getID() {
+        return id;
+      }
+
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public JobState getState() {
+        return report.getJobState();
+      }
+
+      @Override
+      public JobReport getReport() {
+        return report;
+      }
+
+      @Override
+      public float getProgress() {
+        return 0;
+      }
+
+      @Override
+      public Counters getAllCounters() {
+        return counters;
+      }
+
+      @Override
+      public Map<TaskId, Task> getTasks() {
+        return tasks;
+      }
+
+      @Override
+      public Task getTask(TaskId taskID) {
+        return tasks.get(taskID);
+      }
+
+      @Override
+      public int getTotalMaps() {
+        return taskCount.maps;
+      }
+
+      @Override
+      public int getTotalReduces() {
+        return taskCount.reduces;
+      }
+
+      @Override
+      public int getCompletedMaps() {
+        return taskCount.completedMaps;
+      }
+
+      @Override
+      public int getCompletedReduces() {
+        return taskCount.completedReduces;
+      }
+
+      @Override
+      public boolean isUber() {
+        return false;
+      }
+
+      @Override
+      public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+          int fromEventId, int maxEvents) {
+        return null;
+      }
+
+      @Override
+      public Map<TaskId, Task> getTasks(TaskType taskType) {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public List<String> getDiagnostics() {
+        return Collections.<String> emptyList();
+      }
+
+      @Override
+      public boolean checkAccess(UserGroupInformation callerUGI,
+          JobACL jobOperation) {
+        return true;
+      }
+
+      @Override
+      public String getUserName() {
+        return "mock";
+      }
+
+      @Override
+      public String getQueueName() {
+        return "mockqueue";
+      }
+
+      @Override
+      public Path getConfFile() {
+        return configFile;
+      }
+
+      @Override
+      public Map<JobACL, AccessControlList> getJobACLs() {
+        return jobACLs;
+      }
+
+      @Override
+      public List<AMInfo> getAMInfos() {
+        List<AMInfo> amInfoList = new LinkedList<AMInfo>();
+        amInfoList.add(createAMInfo(1));
+        amInfoList.add(createAMInfo(2));
+        return amInfoList;
+      }
+
+      @Override
+      public Configuration loadConfFile() throws IOException {
+        FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
+        Configuration jobConf = new Configuration(false);
+        jobConf.addResource(fc.open(configFile), configFile.toString());
+        return jobConf;
+      }
+
+      @Override
+      public Configuration getConf() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+    };
+  }
+
+  private static AMInfo createAMInfo(int attempt) {
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(100, 1), attempt);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
+        containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,333 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.tez.mapreduce.hadoop.TaskAttemptListenerImplTez;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt failure 
+ * scenarios.
+ */
+@SuppressWarnings("unchecked")
+public class TestFail {
+
+  @Test
+  //First attempt is failed and second attempt is passed
+  //The job succeeds.
+  public void testFailTask() throws Exception {
+    MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
+    Configuration conf = new Configuration();
+    // this test requires two task attempts, but uberization overrides max to 1
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+        task.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
+    //one attempt must be failed 
+    //and another must have succeeded
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+        it.next().getReport().getTaskAttemptState());
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+        it.next().getReport().getTaskAttemptState());
+  }
+
+  @Test
+  public void testMapFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(4, 0);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(4, 0);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testReduceFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(2, 4);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(2, 4);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  //All Task attempts are timed out, leading to Job failure
+  public void testTimedOutTask() throws Exception {
+    MRApp app = new TimeOutTaskMRApp(1, 0);
+    Configuration conf = new Configuration();
+    int maxAttempts = 2;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    // disable uberization (requires entire job to be reattempted, so max for
+    // subtask attempts is overridden to 1)
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.FAILED,
+        task.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts,
+        attempts.size());
+    for (TaskAttempt attempt : attempts.values()) {
+      Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+          attempt.getReport().getTaskAttemptState());
+    }
+  }
+
+  @Test
+  public void testTaskFailWithUnusedContainer() throws Exception {
+    MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
+    Configuration conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    // disable uberization (requires entire job to be reattempted, so max for
+    // subtask attempts is overridden to 1)
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
+        .size());
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.START_WAIT);
+    // TODO XXX: This may not be a valid test.
+    app.getDispatcher().getEventHandler().handle(
+        new TaskAttemptEvent(attempt.getID(),
+            TaskAttemptEventType.TA_CONTAINER_TERMINATED));
+    app.waitForState(job, JobState.FAILED);
+  }
+
+  static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
+
+    public MRAppWithFailingTaskAndUnusedContainer() {
+      super(1, 0, false, "TaskFailWithUnsedContainer", true);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+
+          switch (event.getType()) {
+          case CONTAINER_LAUNCH_REQUEST:
+            super.handle(event); // Unused event and container.
+            break;
+          case CONTAINER_STOP_REQUEST:
+//            getContext().getEventHandler().handle(
+//                new TaskAttemptEvent(event.getTaskAttemptID(),
+//                    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            // TODO XXX: May need a CONTAINER_COMPLETED event to go out.
+            break;
+          }
+        }
+
+        @Override
+        protected ContainerManager getCMProxy(ContainerId contianerID,
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          try {
+            synchronized (this) {
+              wait(); // Just hang the thread simulating a very slow NM.
+            }
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          return null;
+        }
+      };
+    };
+  }
+
+  static class TimeOutTaskMRApp extends MRApp {
+    TimeOutTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, false, "TimeOutTaskMRApp", true);
+    }
+    @Override
+    protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+        TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+      return new TaskAttemptListenerImplTez(getContext(), thh, chh, null) {
+        @Override
+        public void startRpcServer(){};
+        @Override
+        public void stopRpcServer(){};
+        @Override
+        public InetSocketAddress getAddress() {
+          return NetUtils.createSocketAddr("localhost", 1234);
+        }
+      };
+    }
+    
+    protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+        Configuration conf) {
+      // Creates a TaskHeartbeatHandler with a low timeout value. THH will
+      // send out a lost event leading to attempt failure.
+      return new TaskHeartbeatHandler(getContext(), 1) {
+        @Override
+        public void init(Configuration conf) {
+          conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
+          conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
+          super.init(conf);
+        }
+      };
+    }
+  }
+
+  //Attempts of first Task are failed
+  static class MockFirstFailingTaskMRApp extends MRApp {
+
+    MockFirstFailingTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0) {//check if it is first task
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+        // TODO XXX: Was FAIL_MSG. Remove comment.
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  //First attempt is failed
+  static class MockFirstFailingAttemptMRApp extends MRApp {
+    MockFirstFailingAttemptMRApp(int maps, int reduces) {
+      super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
+        //check if it is first task's first attempt
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+        // TODO XXX: Was FAIL_MSG. Remove comment.
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestFail t = new TestFail();
+    t.testFailTask();
+    t.testTimedOutTask();
+    t.testMapFailureMaxPercent();
+    t.testReduceFailureMaxPercent();
+    t.testTaskFailWithUnusedContainer();
+  }
+}