You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2013/01/22 20:33:25 UTC

svn commit: r1437113 [2/5] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduc...

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 22 19:33:02 2013
@@ -165,6 +165,7 @@ public abstract class TaskAttemptImpl im
   private Token<JobTokenIdentifier> jobToken;
   private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
   private static String initialClasspath = null;
+  private static String initialAppClasspath = null;
   private static Object commonContainerSpecLock = new Object();
   private static ContainerLaunchContext commonContainerSpec = null;
   private static final Object classpathLock = new Object();
@@ -527,7 +528,10 @@ public abstract class TaskAttemptImpl im
 
     //TODO:create the resource reqt for this Task attempt
     this.resourceCapability = recordFactory.newRecordInstance(Resource.class);
-    this.resourceCapability.setMemory(getMemoryRequired(conf, taskId.getTaskType()));
+    this.resourceCapability.setMemory(
+        getMemoryRequired(conf, taskId.getTaskType()));
+    this.resourceCapability.setVirtualCores(
+        getCpuRequired(conf, taskId.getTaskType()));
     this.dataLocalHosts = dataLocalHosts;
     RackResolver.init(conf);
 
@@ -551,6 +555,21 @@ public abstract class TaskAttemptImpl im
     return memory;
   }
 
+  private int getCpuRequired(Configuration conf, TaskType taskType) {
+    int vcores = 1;
+    if (taskType == TaskType.MAP)  {
+      vcores =
+          conf.getInt(MRJobConfig.MAP_CPU_VCORES,
+              MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+    } else if (taskType == TaskType.REDUCE) {
+      vcores =
+          conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+              MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+    }
+    
+    return vcores;
+  }
+
   /**
    * Create a {@link LocalResource} record with all the given parameters.
    */
@@ -581,6 +600,7 @@ public abstract class TaskAttemptImpl im
       Map<String, String> env = new HashMap<String, String>();
       MRApps.setClasspath(env, conf);
       initialClasspath = env.get(Environment.CLASSPATH.name());
+      initialAppClasspath = env.get(Environment.APP_CLASSPATH.name());
       initialClasspathFlag.set(true);
       return initialClasspath;
     }
@@ -679,6 +699,13 @@ public abstract class TaskAttemptImpl im
           environment,  
           Environment.CLASSPATH.name(), 
           getInitialClasspath(conf));
+
+      if (initialAppClasspath != null) {
+        Apps.addToEnvironment(
+            environment,  
+            Environment.APP_CLASSPATH.name(), 
+            initialAppClasspath);
+      }
     } catch (IOException e) {
       throw new YarnException(e);
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Jan 22 19:33:02 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -100,6 +101,7 @@ public class RecoveryService extends Com
 
   private final ApplicationAttemptId applicationAttemptId;
   private final OutputCommitter committer;
+  private final boolean newApiCommitter;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
 
@@ -113,10 +115,11 @@ public class RecoveryService extends Com
   private volatile boolean recoveryMode = false;
 
   public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-      Clock clock, OutputCommitter committer) {
+      Clock clock, OutputCommitter committer, boolean newApiCommitter) {
     super("RecoveringDispatcher");
     this.applicationAttemptId = applicationAttemptId;
     this.committer = committer;
+    this.newApiCommitter = newApiCommitter;
     this.dispatcher = createRecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -205,18 +208,18 @@ public class RecoveryService extends Com
       throws IOException {
     FSDataInputStream in = null;
     Path historyFile = null;
-    String jobName =
+    String jobId =
         TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
           .toString();
     String jobhistoryDir =
-        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
     Path histDirPath =
         FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
     // read the previous history file
     historyFile =
         fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
-          jobName, (applicationAttemptId.getAttemptId() - 1)));
+          jobId, (applicationAttemptId.getAttemptId() - 1)));
     LOG.info("History file is at " + historyFile);
     in = fc.open(historyFile);
     return in;
@@ -360,8 +363,17 @@ public class RecoveryService extends Com
         switch (state) {
         case SUCCEEDED:
           //recover the task output
-          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
-              attInfo.getAttemptId());
+          
+          // check the committer type and construct corresponding context
+          TaskAttemptContext taskContext = null;
+          if(newApiCommitter) {
+            taskContext = new TaskAttemptContextImpl(getConfig(),
+                attInfo.getAttemptId());
+          } else {
+            taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
+                TypeConverter.fromYarn(aId));
+          }
+          
           try { 
             TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
             int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Tue Jan 22 19:33:02 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -62,7 +63,8 @@ import org.apache.hadoop.yarn.service.Ab
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
-public abstract class RMCommunicator extends AbstractService  {
+public abstract class RMCommunicator extends AbstractService
+    implements RMHeartbeatHandler {
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
@@ -77,6 +79,8 @@ public abstract class RMCommunicator ext
   private Resource minContainerCapability;
   private Resource maxContainerCapability;
   protected Map<ApplicationAccessType, String> applicationACLs;
+  private volatile long lastHeartbeatTime;
+  private ConcurrentLinkedQueue<Runnable> heartbeatCallbacks;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -95,6 +99,7 @@ public abstract class RMCommunicator ext
     this.applicationId = context.getApplicationID();
     this.applicationAttemptId = context.getApplicationAttemptId();
     this.stopped = new AtomicBoolean(false);
+    this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
   }
 
   @Override
@@ -136,14 +141,19 @@ public abstract class RMCommunicator ext
 
   protected void register() {
     //Register
-    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    InetSocketAddress serviceAddr = null;
+    if (clientService != null ) {
+      serviceAddr = clientService.getBindAddress();
+    }
     try {
       RegisterApplicationMasterRequest request =
         recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
       request.setApplicationAttemptId(applicationAttemptId);
-      request.setHost(serviceAddr.getHostName());
-      request.setRpcPort(serviceAddr.getPort());
-      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      if (serviceAddr != null) {
+        request.setHost(serviceAddr.getHostName());
+        request.setRpcPort(serviceAddr.getPort());
+        request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      }
       RegisterApplicationMasterResponse response =
         scheduler.registerApplicationMaster(request);
       minContainerCapability = response.getMinimumResourceCapability();
@@ -236,8 +246,12 @@ public abstract class RMCommunicator ext
               return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
+              continue;
               // TODO: for other exceptions
             }
+
+            lastHeartbeatTime = context.getClock().getTime();
+            executeHeartbeatCallbacks();
           } catch (InterruptedException e) {
             if (!stopped.get()) {
               LOG.warn("Allocated thread interrupted. Returning.");
@@ -295,6 +309,23 @@ public abstract class RMCommunicator ext
 
   protected abstract void heartbeat() throws Exception;
 
+  private void executeHeartbeatCallbacks() {
+    Runnable callback = null;
+    while ((callback = heartbeatCallbacks.poll()) != null) {
+      callback.run();
+    }
+  }
+
+  @Override
+  public long getLastHeartbeatTime() {
+    return lastHeartbeatTime;
+  }
+
+  @Override
+  public void runOnNextHeartbeat(Runnable callback) {
+    heartbeatCallbacks.add(callback);
+  }
+
   public void setShouldUnregister(boolean shouldUnregister) {
     this.shouldUnregister = shouldUnregister;
     LOG.info("RMCommunicator notified that shouldUnregistered is: " 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Tue Jan 22 19:33:02 2013
@@ -1 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
 org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Jan 22 19:33:02 2013
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -32,6 +34,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -41,7 +44,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
@@ -51,8 +56,9 @@ public class TestTaskAttemptListenerImpl
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
+        RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -76,9 +82,12 @@ public class TestTaskAttemptListenerImpl
   public void testGetTask() throws IOException {
     AppContext appCtx = mock(AppContext.class);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     MockTaskAttemptListenerImpl listener = 
-      new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+      new MockTaskAttemptListenerImpl(appCtx, secret,
+          rmHeartbeatHandler, hbHandler);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -145,16 +154,21 @@ public class TestTaskAttemptListenerImpl
       .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
     when(mockJob.getTaskAttemptCompletionEvents(2, 100))
       .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
-    when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
-    when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
-    when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
+    when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(
+        TypeConverter.fromYarn(mapEvents));
+    when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(
+        TypeConverter.fromYarn(mapEvents));
+    when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(
+        TypeConverter.fromYarn(empty));
 
     AppContext appCtx = mock(AppContext.class);
     when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -191,4 +205,46 @@ public class TestTaskAttemptListenerImpl
     return tce;
   }
 
+  @Test
+  public void testCommitWindow() throws IOException {
+    SystemClock clock = new SystemClock();
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    when(appCtx.getClock()).thenReturn(clock);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+
+    // verify commit not allowed when RM heartbeat has not occurred recently
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+    boolean canCommit = listener.canCommit(tid);
+    assertFalse(canCommit);
+    verify(mockTask, never()).canCommit(any(TaskAttemptId.class));
+
+    // verify commit allowed when RM heartbeat is recent
+    when(rmHeartbeatHandler.getLastHeartbeatTime()).thenReturn(clock.getTime());
+    canCommit = listener.canCommit(tid);
+    assertTrue(canCommit);
+    verify(mockTask, times(1)).canCommit(any(TaskAttemptId.class));
+
+    listener.stop();
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Jan 22 19:33:02 2013
@@ -18,14 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,7 +46,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.verification.VerificationMode;
 
 public class TestJobHistoryEventHandler {
 
@@ -260,13 +254,15 @@ public class TestJobHistoryEventHandler 
     }
   }
 
-  private AppContext mockAppContext(JobId jobId) {
+  private AppContext mockAppContext(ApplicationId appId) {
+    JobId jobId = TypeConverter.toYarn(TypeConverter.fromYarn(appId));
     AppContext mockContext = mock(AppContext.class);
     Job mockJob = mock(Job.class);
     when(mockJob.getTotalMaps()).thenReturn(10);
     when(mockJob.getTotalReduces()).thenReturn(10);
     when(mockJob.getName()).thenReturn("mockjob");
     when(mockContext.getJob(jobId)).thenReturn(mockJob);
+    when(mockContext.getApplicationID()).thenReturn(appId);
     return mockContext;
   }
   
@@ -279,7 +275,7 @@ public class TestJobHistoryEventHandler 
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
     TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    AppContext mockAppContext = mockAppContext(jobId);
+    AppContext mockAppContext = mockAppContext(appId);
   }
 
   private JobHistoryEvent getEventToEnqueue(JobId jobId) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Jan 22 19:33:02 2013
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -74,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -207,6 +209,16 @@ public class MRApp extends MRAppMaster {
 
   @Override
   public void init(Configuration conf) {
+    try {
+      //Create the staging directory if it does not exist
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+      FileSystem fs = getFileSystem(conf);
+      fs.mkdirs(stagingDir);
+    } catch (Exception e) {
+      throw new YarnException("Error creating staging dir", e);
+    }
+    
     super.init(conf);
     if (this.clusterInfo != null) {
       getContext().getClusterInfo().setMinContainerCapability(
@@ -215,9 +227,9 @@ public class MRApp extends MRAppMaster {
           this.clusterInfo.getMaxContainerCapability());
     } else {
       getContext().getClusterInfo().setMinContainerCapability(
-          BuilderUtils.newResource(1024));
+          BuilderUtils.newResource(1024, 1));
       getContext().getClusterInfo().setMaxContainerCapability(
-          BuilderUtils.newResource(10240));
+          BuilderUtils.newResource(10240, 1));
     }
   }
 
@@ -387,7 +399,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected Job createJob(Configuration conf) {
+  protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+      String diagnostic) {
     UserGroupInformation currentUser = null;
     try {
       currentUser = UserGroupInformation.getCurrentUser();
@@ -397,7 +410,8 @@ public class MRApp extends MRAppMaster {
     Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
     		getDispatcher().getEventHandler(),
             getTaskAttemptListener(), getContext().getClock(),
-            isNewApiCommitter(), currentUser.getUserName(), getContext());
+            isNewApiCommitter(), currentUser.getUserName(), getContext(),
+            forcedState, diagnostic);
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -489,7 +503,8 @@ public class MRApp extends MRAppMaster {
     return new MRAppContainerAllocator();
   }
 
-  protected class MRAppContainerAllocator implements ContainerAllocator {
+  protected class MRAppContainerAllocator
+      implements ContainerAllocator, RMHeartbeatHandler {
     private int containerCount;
 
      @Override
@@ -514,6 +529,16 @@ public class MRApp extends MRAppMaster {
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
                 container, null));
       }
+
+    @Override
+    public long getLastHeartbeatTime() {
+      return getContext().getClock().getTime();
+    }
+
+    @Override
+    public void runOnNextHeartbeat(Runnable callback) {
+      callback.run();
+    }
   }
 
   @Override
@@ -566,7 +591,8 @@ public class MRApp extends MRAppMaster {
       }
     };
 
-    return new CommitterEventHandler(context, stubbedCommitter);
+    return new CommitterEventHandler(context, stubbedCommitter,
+        getRMHeartbeatHandler());
   }
 
   @Override
@@ -618,13 +644,14 @@ public class MRApp extends MRAppMaster {
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,
-        boolean newApiCommitter, String user, AppContext appContext) {
+        boolean newApiCommitter, String user, AppContext appContext, 
+        JobStateInternal forcedState, String diagnostic) {
       super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
           new JobTokenSecretManager(), new Credentials(), clock,
           getCompletedTaskFromPreviousRun(), metrics,
           newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
-          appContext);
+          appContext, forcedState, diagnostic);
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue Jan 22 19:33:02 2013
@@ -207,9 +207,9 @@ public class MRAppBenchmark {
                 RegisterApplicationMasterResponse response =
                     Records.newRecord(RegisterApplicationMasterResponse.class);
                 response.setMinimumResourceCapability(BuilderUtils
-                  .newResource(1024));
+                  .newResource(1024, 1));
                 response.setMaximumResourceCapability(BuilderUtils
-                  .newResource(10240));
+                  .newResource(10240, 1));
                 return response;
               }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Tue Jan 22 19:33:02 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobACL;
@@ -556,7 +557,7 @@ public class MockJobs extends MockApps {
       }
 
       @Override
-      public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      public TaskCompletionEvent[] getMapAttemptCompletionEvents(
           int startIndex, int maxEvents) {
         return null;
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Jan 22 19:33:02 2013
@@ -252,7 +252,7 @@ public class TestFail {
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null) {
         @Override
         public void startRpcServer(){};
         @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Tue Jan 22 19:33:02 2013
@@ -25,8 +25,10 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -150,14 +152,16 @@ public class TestFetchFailure {
     Assert.assertEquals("Event status not correct for reduce attempt1",
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
 
-    TaskAttemptCompletionEvent mapEvents[] =
+    TaskCompletionEvent mapEvents[] =
         job.getMapAttemptCompletionEvents(0, 2);
+    TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
     Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
     Assert.assertArrayEquals("Unexpected map events",
-        Arrays.copyOfRange(events, 0, 2), mapEvents);
+        Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
     mapEvents = job.getMapAttemptCompletionEvents(2, 200);
     Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
-    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+    Assert.assertEquals("Unexpected map event", convertedEvents[2],
+        mapEvents[0]);
   }
   
   /**
@@ -395,14 +399,16 @@ public class TestFetchFailure {
     Assert.assertEquals("Event status not correct for reduce attempt1",
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
 
-    TaskAttemptCompletionEvent mapEvents[] =
+    TaskCompletionEvent mapEvents[] =
         job.getMapAttemptCompletionEvents(0, 2);
+    TaskCompletionEvent convertedEvents[] = TypeConverter.fromYarn(events);
     Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
     Assert.assertArrayEquals("Unexpected map events",
-        Arrays.copyOfRange(events, 0, 2), mapEvents);
+        Arrays.copyOfRange(convertedEvents, 0, 2), mapEvents);
     mapEvents = job.getMapAttemptCompletionEvents(2, 200);
     Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
-    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+    Assert.assertEquals("Unexpected map event", convertedEvents[2],
+        mapEvents[0]);
   }
   
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Jan 22 19:33:02 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
@@ -370,8 +371,9 @@ public class TestMRApp {
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
-      spiedJob = spy((JobImpl) super.createJob(conf));
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
+      spiedJob = spy((JobImpl) super.createJob(conf, forcedState, diagnostic));
       ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
       return spiedJob;
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Jan 22 19:33:02 2013
@@ -17,28 +17,63 @@
  */
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRAppMaster {
+  private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+  static String stagingDir = "staging/";
+  
+  @BeforeClass
+  public static void setup() {
+    //Do not error out if metrics are inited multiple times
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+  
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testMRAppMasterForDifferentUser() throws IOException,
       InterruptedException {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
     String containerIdStr = "container_1317529182569_0004_000001_1";
-    String stagingDir = "/tmp/staging";
+    
     String userName = "TestAppMasterUser";
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
-    Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+    assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
         + ".staging", appMaster.stagingDirPath.toString());
   }
+  
+  @Test
+  public void testMRAppMasterMidLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    //Create the file, but no end file so we should unregister with an error.
+    fs.create(start).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterSuccessLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterFailLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterMissingStaging() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+
+    //Delete the staging directory
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    //Copying the history file is disabled, but it is not really visible from 
+    //here
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
 }
 
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
   private Configuration conf;
+  private boolean overrideInitAndStart;
+  ContainerAllocator mockContainerAllocator;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       long submitTime) {
+    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
+        true);
+  }
+  public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime, boolean overrideInitAndStart) {
     super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+    this.overrideInitAndStart = overrideInitAndStart;
+    mockContainerAllocator = mock(ContainerAllocator.class);
   }
 
   @Override
   public void init(Configuration conf) {
-    this.conf = conf;
+    if (overrideInitAndStart) {
+      this.conf = conf; 
+    } else {
+      super.init(conf);
+    }
+  }
+  
+  @Override 
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+    try {
+      this.currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+  
+  @Override
+  protected ContainerAllocator createContainerAllocator(
+      final ClientService clientService, final AppContext context) {
+    return mockContainerAllocator;
   }
 
   @Override
   public void start() {
-    try {
-      String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      stagingDirPath = MRApps.getStagingAreaDir(conf, user);
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
+    if (overrideInitAndStart) {
+      try {
+        String user = UserGroupInformation.getCurrentUser().getShortUserName();
+        stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    } else {
+      super.start();
     }
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jan 22 19:33:02 2013
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.Assert;
 
@@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1139,7 +1142,7 @@ public class TestRMContainerAllocator {
     }
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
         taskAttemptId);
-    Resource containerNeed = BuilderUtils.newResource(memory);
+    Resource containerNeed = BuilderUtils.newResource(memory, 1);
     if (earlierFailedAttempt) {
       return ContainerRequestEvent
           .createContainerRequestEventForFailedContainer(attemptId,
@@ -1222,8 +1225,8 @@ public class TestRMContainerAllocator {
       when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(context.getJob(isA(JobId.class))).thenReturn(job);
       when(context.getClusterInfo()).thenReturn(
-          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
-              .newResource(10240)));
+          new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+              .newResource(10240, 1)));
       when(context.getEventHandler()).thenReturn(new EventHandler() {
         @Override
         public void handle(Event event) {
@@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
       return context;
     }
 
+    private static AppContext createAppContext(
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      AppContext context = createAppContext(appAttemptId, job);
+      when(context.getClock()).thenReturn(clock);
+      return context;
+    }
+
     private static ClientService createMockClientService() {
       ClientService service = mock(ClientService.class);
       when(service.getBindAddress()).thenReturn(
@@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
       super.start();
     }
 
+    public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      super(createMockClientService(),
+          createAppContext(appAttemptId, job, clock));
+      this.rm = rm;
+      super.init(conf);
+      super.start();
+    }
+
     @Override
     protected AMRMProtocol createSchedulerProxy() {
       return this.rm.getApplicationMasterService();
@@ -1280,12 +1299,12 @@ public class TestRMContainerAllocator {
 
     @Override
     protected Resource getMinContainerCapability() {
-      return BuilderUtils.newResource(1024);
+      return BuilderUtils.newResource(1024, 1);
     }
 
     @Override
     protected Resource getMaxContainerCapability() {
-      return BuilderUtils.newResource(10240);
+      return BuilderUtils.newResource(10240, 1);
     }
 
     public void sendRequest(ContainerRequestEvent req) {
@@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
         allocator.recalculatedReduceSchedule);
   }
 
+  @Test
+  public void testHeartbeatHandler() throws Exception {
+    LOG.info("Running testHeartbeatHandler");
+
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getApplicationID()).thenReturn(
+        BuilderUtils.newApplicationId(1, 1));
+
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), appContext) {
+          @Override
+          protected void register() {
+          }
+          @Override
+          protected AMRMProtocol createSchedulerProxy() {
+            return mock(AMRMProtocol.class);
+          }
+          @Override
+          protected synchronized void heartbeat() throws Exception {
+          }
+    };
+    allocator.init(conf);
+    allocator.start();
+
+    clock.setTime(5);
+    int timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(5, allocator.getLastHeartbeatTime());
+    clock.setTime(7);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(7, allocator.getLastHeartbeatTime());
+
+    final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+    allocator.runOnNextHeartbeat(new Runnable() {
+      @Override
+      public void run() {
+        callbackCalled.set(true);
+      }
+    });
+    clock.setTime(8);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(8, allocator.getLastHeartbeatTime());
+    Assert.assertTrue(callbackCalled.get());
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Jan 22 19:33:02 2013
@@ -626,6 +626,115 @@ public class TestRecovery {
     validateOutput();
   }
   
+  @Test
+  public void testRecoveryWithOldCommiter() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+  
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Jan 22 19:33:02 2013
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@@ -441,7 +442,7 @@ public class TestRuntimeEstimators {
     }
 
     @Override
-    public TaskAttemptCompletionEvent[]
+    public TaskCompletionEvent[]
             getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
       throw new UnsupportedOperationException("Not supported yet.");
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Tue Jan 22 19:33:02 2013
@@ -38,10 +38,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -71,6 +74,10 @@ import org.junit.Test;
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -92,6 +99,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -117,6 +128,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(1);
@@ -166,6 +181,11 @@ import org.junit.Test;
      }
 
      @Override
+     public RMHeartbeatHandler getRMHeartbeatHandler() {
+       return getStubbedHeartbeatHandler(getContext());
+     }
+
+     @Override
      protected void sysexit() {      
      }
 
@@ -177,6 +197,7 @@ import org.junit.Test;
      @Override
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
+
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -191,7 +212,8 @@ import org.junit.Test;
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
       UserGroupInformation currentUser = null;
       try {
         currentUser = UserGroupInformation.getCurrentUser();
@@ -201,7 +223,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          isNewApiCommitter(), currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,
@@ -238,6 +261,11 @@ import org.junit.Test;
     }
 
     @Override
+    public RMHeartbeatHandler getRMHeartbeatHandler() {
+      return getStubbedHeartbeatHandler(getContext());
+    }
+
+    @Override
     public void cleanupStagingDir() throws IOException {
       cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
     }
@@ -247,6 +275,20 @@ import org.junit.Test;
     }
   }
 
+  private static RMHeartbeatHandler getStubbedHeartbeatHandler(
+      final AppContext appContext) {
+    return new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return appContext.getClock().getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
+  }
+
   @Test
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jan 22 19:33:02 2013
@@ -23,11 +23,13 @@ import static org.mockito.Mockito.doThro
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -56,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -66,8 +69,11 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 
@@ -77,10 +83,28 @@ import org.junit.Test;
 @SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
+  static String stagingDir = "target/test-staging/";
+
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testJobNoTasks() {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -102,6 +126,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCommitJobFailsJob() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -126,6 +151,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testCheckJobCompleteSuccess() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -150,6 +176,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringSetup() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -186,6 +213,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringCommit() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -210,6 +238,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringFailAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -251,6 +280,7 @@ public class TestJobImpl {
   @Test(timeout=20000)
   public void testKilledDuringKillAbort() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -315,7 +345,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -326,7 +356,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -337,7 +367,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -348,7 +378,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -359,7 +389,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -377,7 +407,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -388,7 +418,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -443,7 +473,7 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
@@ -502,13 +532,27 @@ public class TestJobImpl {
 
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
-    SystemClock clock = new SystemClock();
+    final SystemClock clock = new SystemClock();
     AppContext appContext = mock(AppContext.class);
     when(appContext.getEventHandler()).thenReturn(
         dispatcher.getEventHandler());
     when(appContext.getClock()).thenReturn(clock);
+    RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return clock.getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
+    ApplicationAttemptId id = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(id);
     CommitterEventHandler handler =
-        new CommitterEventHandler(appContext, committer);
+        new CommitterEventHandler(appContext, committer, heartbeatHandler);
     dispatcher.register(CommitterEventType.class, handler);
     return handler;
   }
@@ -590,7 +634,8 @@ public class TestJobImpl {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
           new SystemClock(), null, MRAppMetrics.create(),
-          newApiCommitter, user, System.currentTimeMillis(), null, null);
+          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+          null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Jan 22 19:33:02 2013
@@ -197,7 +197,7 @@ public class TestTaskAttempt{
     conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
     conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
     app.setClusterInfo(new ClusterInfo(BuilderUtils
-        .newResource(minContainerSize), BuilderUtils.newResource(10240)));
+        .newResource(minContainerSize, 1), BuilderUtils.newResource(10240, 1)));
 
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Jan 22 19:33:02 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import static org.mockito.Matchers.any;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java Tue Jan 22 19:33:02 2013
@@ -1,3 +1,20 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.hadoop.mapreduce.v2.app.local;
 
 import static org.mockito.Matchers.isA;
@@ -99,8 +116,8 @@ public class TestLocalContainerAllocator
       when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
       when(ctx.getJob(isA(JobId.class))).thenReturn(job);
       when(ctx.getClusterInfo()).thenReturn(
-          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
-              .newResource(10240)));
+          new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+              .newResource(10240, 1)));
       when(ctx.getEventHandler()).thenReturn(eventHandler);
       return ctx;
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1437113&r1=1437112&r2=1437113&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Tue Jan 22 19:33:02 2013
@@ -81,6 +81,7 @@
             <configuration>
               <executable>protoc</executable>
               <arguments>
+                <argument>-I../../../hadoop-common-project/hadoop-common/src/main/proto/</argument>
                 <argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
                 <argument>-Isrc/main/proto/</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>