You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2014/07/12 04:25:03 UTC

svn commit: r1609878 [1/2] - in /hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-c...

Author: subru
Date: Sat Jul 12 02:24:40 2014
New Revision: 1609878

URL: http://svn.apache.org/r1609878
Log:
syncing YARN-1051 branch with trunk

Added:
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
      - copied from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
      - copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
      - copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java
Removed:
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified:
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
    hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1603348-1609877

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt Sat Jul 12 02:24:40 2014
@@ -145,6 +145,24 @@ Trunk (Unreleased)
     MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to 
     ProportionalCapacityPreemptionPolicy (Sunil G via devaraj)
 
+Release 2.6.0 - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows.
+    (Varun Vasudev via cnauroth)
+
+    MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
+    attempt is the last retry. (Wangda Tan via zjshen)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -213,6 +231,12 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid
     occassional failures. (Mit Desai via vinodkv)
 
+    MAPREDUCE-5896. InputSplits should indicate which locations have the block 
+    cached in memory. (Sandy Ryza via kasha)
+
+    MAPREDUCE-5844. Add a configurable delay to reducer-preemption. 
+    (Maysam Yabandeh via kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES 
@@ -267,6 +291,19 @@ Release 2.5.0 - UNRELEASED
     MAPREDUCE-5924. Changed TaskAttemptImpl to ignore TA_COMMIT_PENDING event
     at COMMIT_PENDING state. (Zhijie Shen via jianhe)
 
+    MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after
+    upgrade (Chen He via jlowe)
+
+    MAPREDUCE-5900. Changed to the interpret container preemption exit code as a
+    task attempt killing event. (Mayank Bansal via zjshen)
+
+    MAPREDUCE-5868. Fixed an issue with TestPipeApplication that was causing the
+    nightly builds to fail. (Akira Ajisaka via vinodkv)
+
+    MAPREDUCE-5517. Fixed MapReduce ApplicationMaster to not validate reduce side
+    resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
+    vinodkv)
+
 Release 2.4.1 - 2014-06-23 
 
   INCOMPATIBLE CHANGES
@@ -275,6 +312,10 @@ Release 2.4.1 - 2014-06-23 
 
   IMPROVEMENTS
 
+    MAPREDUCE-5830. Added back the private API HostUtil.getTaskLogUrl(..) for
+    binary compatibility with older clients like Hive 0.13. (Akira Ajisaka via
+    vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1603348-1609877

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Sat Jul 12 02:24:40 2014
@@ -475,8 +475,8 @@
    <Match>
      <Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
      <Or>
-      <Field name="mapResourceReqt" />
-      <Field name="reduceResourceReqt" />
+      <Field name="mapResourceRequest" />
+      <Field name="reduceResourceRequest" />
       <Field name="maxReduceRampupLimit" />
       <Field name="reduceSlowStart" />
      </Or>

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Jul 12 02:24:40 2014
@@ -186,7 +186,6 @@ public class MRAppMaster extends Composi
   private final int nmPort;
   private final int nmHttpPort;
   protected final MRAppMetrics metrics;
-  private final int maxAppAttempts;
   private Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
   private List<AMInfo> amInfos;
   private AppContext context;
@@ -227,14 +226,14 @@ public class MRAppMaster extends Composi
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime, int maxAppAttempts) {
+      long appSubmitTime) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime, maxAppAttempts);
+        new SystemClock(), appSubmitTime);
   }
 
   public MRAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime, int maxAppAttempts) {
+      Clock clock, long appSubmitTime) {
     super(MRAppMaster.class.getName());
     this.clock = clock;
     this.startTime = clock.getTime();
@@ -245,7 +244,6 @@ public class MRAppMaster extends Composi
     this.nmPort = nmPort;
     this.nmHttpPort = nmHttpPort;
     this.metrics = MRAppMetrics.create();
-    this.maxAppAttempts = maxAppAttempts;
     logSyncer = TaskLog.createLogSyncer();
     LOG.info("Created MRAppMaster for application " + applicationAttemptId);
   }
@@ -258,12 +256,6 @@ public class MRAppMaster extends Composi
 
     context = new RunningAppContext(conf);
 
-    ((RunningAppContext)context).computeIsLastAMRetry();
-    LOG.info("The specific max attempts: " + maxAppAttempts +
-        " for application: " + appAttemptID.getApplicationId().getId() +
-        ". Attempt num: " + appAttemptID.getAttemptId() +
-        " is last retry: " + isLastAMRetry);
-
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
     appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
@@ -1007,8 +999,8 @@ public class MRAppMaster extends Composi
       successfullyUnregistered.set(true);
     }
 
-    public void computeIsLastAMRetry() {
-      isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
+    public void resetIsLastAMRetry() {
+      isLastAMRetry = false;
     }
   }
 
@@ -1388,8 +1380,6 @@ public class MRAppMaster extends Composi
           System.getenv(Environment.NM_HTTP_PORT.name());
       String appSubmitTimeStr =
           System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
-      String maxAppAttempts =
-          System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
       
       validateInputParam(containerIdStr,
           Environment.CONTAINER_ID.name());
@@ -1399,8 +1389,6 @@ public class MRAppMaster extends Composi
           Environment.NM_HTTP_PORT.name());
       validateInputParam(appSubmitTimeStr,
           ApplicationConstants.APP_SUBMIT_TIME_ENV);
-      validateInputParam(maxAppAttempts,
-          ApplicationConstants.MAX_APP_ATTEMPTS_ENV);
 
       ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
       ApplicationAttemptId applicationAttemptId =
@@ -1411,8 +1399,7 @@ public class MRAppMaster extends Composi
       MRAppMaster appMaster =
           new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime,
-              Integer.parseInt(maxAppAttempts));
+              Integer.parseInt(nodeHttpPortString), appSubmitTime);
       ShutdownHookManager.get().addShutdownHook(
         new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
       JobConf conf = new JobConf(new YarnConfiguration());

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Jul 12 02:24:40 2014
@@ -1218,22 +1218,25 @@ public class JobImpl implements org.apac
     boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
     boolean smallInput = (dataInputLength <= sysMaxBytes);
     // ignoring overhead due to UberAM and statics as negligible here:
+    long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
+    long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
+    long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
+    int requiredMapCores = conf.getInt(
+            MRJobConfig.MAP_CPU_VCORES, 
+            MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+    int requiredReduceCores = conf.getInt(
+            MRJobConfig.REDUCE_CPU_VCORES, 
+            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+    int requiredCores = Math.max(requiredMapCores, requiredReduceCores);    
+    if (numReduceTasks == 0) {
+      requiredMB = requiredMapMB;
+      requiredCores = requiredMapCores;
+    }
     boolean smallMemory =
-        ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0),
-            conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
-            <= sysMemSizeForUberSlot)
-            || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
-    boolean smallCpu =
-        (
-            Math.max(
-                conf.getInt(
-                    MRJobConfig.MAP_CPU_VCORES, 
-                    MRJobConfig.DEFAULT_MAP_CPU_VCORES), 
-                conf.getInt(
-                    MRJobConfig.REDUCE_CPU_VCORES, 
-                    MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)) 
-             <= sysCPUSizeForUberSlot
-        );
+        (requiredMB <= sysMemSizeForUberSlot)
+        || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);
+    
+    boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
     boolean notChainJob = !isChainJob(conf);
 
     // User has overall veto power over uberization, or user can modify

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sat Jul 12 02:24:40 2014
@@ -185,7 +185,7 @@ public abstract class RMCommunicator ext
       // if unregistration failed, isLastAMRetry needs to be recalculated
       // to see whether AM really has the chance to retry
       RunningAppContext raContext = (RunningAppContext) context;
-      raContext.computeIsLastAMRetry();
+      raContext.resetIsLastAMRetry();
     }
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Jul 12 02:24:40 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -143,15 +144,21 @@ public class RMContainerAllocator extend
   private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
-  private int mapResourceReqt;//memory
-  private int reduceResourceReqt;//memory
+  private int mapResourceRequest;//memory
+  private int reduceResourceRequest;//memory
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
+  /**
+   * after this threshold, if the container request is not allocated, it is
+   * considered delayed.
+   */
+  private long allocationDelayThresholdMs = 0;
   private float reduceSlowStart = 0;
   private long retryInterval;
   private long retrystartTime;
+  private Clock clock;
 
   private final AMPreemptionPolicy preemptionPolicy;
 
@@ -166,6 +173,7 @@ public class RMContainerAllocator extend
     super(clientService, context);
     this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
+    this.clock = context.getClock();
   }
 
   @Override
@@ -180,6 +188,9 @@ public class RMContainerAllocator extend
     maxReducePreemptionLimit = conf.getFloat(
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+    allocationDelayThresholdMs = conf.getInt(
+        MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+        MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
     RackResolver.init(conf);
     retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
                                 MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -246,7 +257,7 @@ public class RMContainerAllocator extend
           getJob().getTotalMaps(), completedMaps,
           scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
           assignedRequests.maps.size(), assignedRequests.reduces.size(),
-          mapResourceReqt, reduceResourceReqt,
+          mapResourceRequest, reduceResourceRequest,
           pendingReduces.size(), 
           maxReduceRampupLimit, reduceSlowStart);
       recalculateReduceSchedule = false;
@@ -268,6 +279,18 @@ public class RMContainerAllocator extend
     scheduleStats.log("Final Stats: ");
   }
 
+  @Private
+  @VisibleForTesting
+  AssignedRequests getAssignedRequests() {
+    return assignedRequests;
+  }
+
+  @Private
+  @VisibleForTesting
+  ScheduledRequests getScheduledRequests() {
+    return scheduledRequests;
+  }
+
   public boolean getIsReduceStarted() {
     return reduceStarted;
   }
@@ -303,16 +326,16 @@ public class RMContainerAllocator extend
       int supportedMaxContainerCapability =
           getMaxContainerCapability().getMemory();
       if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceReqt == 0) {
-          mapResourceReqt = reqEvent.getCapability().getMemory();
+        if (mapResourceRequest == 0) {
+          mapResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
-              mapResourceReqt)));
-          LOG.info("mapResourceReqt:"+mapResourceReqt);
-          if (mapResourceReqt > supportedMaxContainerCapability) {
+                  mapResourceRequest)));
+          LOG.info("mapResourceRequest:"+ mapResourceRequest);
+          if (mapResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "MAP capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. mapResourceReqt: " + 
-            mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
+            "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
+                mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
                 jobId, diagMsg));
@@ -320,20 +343,20 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(mapResourceReqt);
+        reqEvent.getCapability().setMemory(mapResourceRequest);
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
-        if (reduceResourceReqt == 0) {
-          reduceResourceReqt = reqEvent.getCapability().getMemory();
+        if (reduceResourceRequest == 0) {
+          reduceResourceRequest = reqEvent.getCapability().getMemory();
           eventHandler.handle(new JobHistoryEvent(jobId, 
               new NormalizedResourceEvent(
                   org.apache.hadoop.mapreduce.TaskType.REDUCE,
-              reduceResourceReqt)));
-          LOG.info("reduceResourceReqt:"+reduceResourceReqt);
-          if (reduceResourceReqt > supportedMaxContainerCapability) {
+                  reduceResourceRequest)));
+          LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
+          if (reduceResourceRequest > supportedMaxContainerCapability) {
             String diagMsg = "REDUCE capability required is more than the " +
             		"supported max container capability in the cluster. Killing the " +
-            		"Job. reduceResourceReqt: " + reduceResourceReqt +
+            		"Job. reduceResourceRequest: " + reduceResourceRequest +
             		" maxContainerCapability:" + supportedMaxContainerCapability;
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
@@ -342,7 +365,7 @@ public class RMContainerAllocator extend
           }
         }
         //set the rounded off memory
-        reqEvent.getCapability().setMemory(reduceResourceReqt);
+        reqEvent.getCapability().setMemory(reduceResourceRequest);
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -394,8 +417,22 @@ public class RMContainerAllocator extend
     return host;
   }
 
-  private void preemptReducesIfNeeded() {
-    if (reduceResourceReqt == 0) {
+  @Private
+  @VisibleForTesting
+  synchronized void setReduceResourceRequest(int mem) {
+    this.reduceResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  synchronized void setMapResourceRequest(int mem) {
+    this.mapResourceRequest = mem;
+  }
+
+  @Private
+  @VisibleForTesting
+  void preemptReducesIfNeeded() {
+    if (reduceResourceRequest == 0) {
       return; //no reduces
     }
     //check if reduces have taken over the whole cluster and there are 
@@ -403,9 +440,9 @@ public class RMContainerAllocator extend
     if (scheduledRequests.maps.size() > 0) {
       int memLimit = getMemLimit();
       int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
-          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+          assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
       //availableMemForMap must be sufficient to run atleast 1 map
-      if (availableMemForMap < mapResourceReqt) {
+      if (availableMemForMap < mapResourceRequest) {
         //to make sure new containers are given to maps and not reduces
         //ramp down all scheduled reduces if any
         //(since reduces are scheduled at higher priority than maps)
@@ -414,22 +451,40 @@ public class RMContainerAllocator extend
           pendingReduces.add(req);
         }
         scheduledRequests.reduces.clear();
-        
-        //preempt for making space for at least one map
-        int premeptionLimit = Math.max(mapResourceReqt, 
-            (int) (maxReducePreemptionLimit * memLimit));
-        
-        int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, 
-            premeptionLimit);
-        
-        int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
-        toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-        
-        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
-        assignedRequests.preemptReduce(toPreempt);
+
+        //do further checking to find the number of map requests that were
+        //hanging around for a while
+        int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
+        if (hangingMapRequests > 0) {
+          //preempt for making space for at least one map
+          int premeptionLimit = Math.max(mapResourceRequest,
+              (int) (maxReducePreemptionLimit * memLimit));
+
+          int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
+              premeptionLimit);
+
+          int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
+          toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+          LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+          assignedRequests.preemptReduce(toPreempt);
+        }
       }
     }
   }
+
+  private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+    if (allocationDelayThresholdMs <= 0)
+      return requestMap.size();
+    int hangingRequests = 0;
+    long currTime = clock.getTime();
+    for (ContainerRequest request: requestMap.values()) {
+      long delay = currTime - request.requestTimeMs;
+      if (delay > allocationDelayThresholdMs)
+        hangingRequests++;
+    }
+    return hangingRequests;
+  }
   
   @Private
   public void scheduleReduces(
@@ -664,7 +719,8 @@ public class RMContainerAllocator extend
   @VisibleForTesting
   public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
       TaskAttemptId attemptID) {
-    if (cont.getExitStatus() == ContainerExitStatus.ABORTED) {
+    if (cont.getExitStatus() == ContainerExitStatus.ABORTED
+        || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) {
       // killed by framework
       return new TaskAttemptEvent(attemptID,
           TaskAttemptEventType.TA_KILL);
@@ -715,11 +771,13 @@ public class RMContainerAllocator extend
   @Private
   public int getMemLimit() {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceReqt + 
-       assignedRequests.reduces.size() * reduceResourceReqt;
+    return headRoom + assignedRequests.maps.size() * mapResourceRequest +
+       assignedRequests.reduces.size() * reduceResourceRequest;
   }
-  
-  private class ScheduledRequests {
+
+  @Private
+  @VisibleForTesting
+  class ScheduledRequests {
     
     private final LinkedList<TaskAttemptId> earlierFailedMaps = 
       new LinkedList<TaskAttemptId>();
@@ -729,7 +787,8 @@ public class RMContainerAllocator extend
       new HashMap<String, LinkedList<TaskAttemptId>>();
     private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = 
       new HashMap<String, LinkedList<TaskAttemptId>>();
-    private final Map<TaskAttemptId, ContainerRequest> maps = 
+    @VisibleForTesting
+    final Map<TaskAttemptId, ContainerRequest> maps =
       new LinkedHashMap<TaskAttemptId, ContainerRequest>();
     
     private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = 
@@ -825,22 +884,22 @@ public class RMContainerAllocator extend
         int allocatedMemory = allocated.getResource().getMemory();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocatedMemory < mapResourceReqt
+          if (allocatedMemory < mapResourceRequest
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
-                + " container memory less than required " + mapResourceReqt
+                + " container memory less than required " + mapResourceRequest
                 + " or no pending map tasks - maps.isEmpty=" 
                 + maps.isEmpty()); 
             isAssignable = false; 
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocatedMemory < reduceResourceReqt
+          if (allocatedMemory < reduceResourceRequest
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "
-                + " container memory less than required " + reduceResourceReqt
+                + " container memory less than required " + reduceResourceRequest
                 + " or no pending reduce tasks - reduces.isEmpty=" 
                 + reduces.isEmpty()); 
             isAssignable = false;
@@ -1119,14 +1178,18 @@ public class RMContainerAllocator extend
     }
   }
 
-  private class AssignedRequests {
+  @Private
+  @VisibleForTesting
+  class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
     private final LinkedHashMap<TaskAttemptId, Container> maps = 
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final LinkedHashMap<TaskAttemptId, Container> reduces = 
+    @VisibleForTesting
+    final LinkedHashMap<TaskAttemptId, Container> reduces =
       new LinkedHashMap<TaskAttemptId, Container>();
-    private final Set<TaskAttemptId> preemptionWaitingReduces = 
+    @VisibleForTesting
+    final Set<TaskAttemptId> preemptionWaitingReduces =
       new HashSet<TaskAttemptId>();
     
     void add(Container container, TaskAttemptId tId) {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Sat Jul 12 02:24:40 2014
@@ -29,8 +29,10 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -96,6 +98,8 @@ public abstract class RMContainerRequest
     super(clientService, context);
   }
 
+  @Private
+  @VisibleForTesting
   static class ContainerRequest {
     final TaskAttemptId attemptID;
     final Resource capability;
@@ -103,20 +107,39 @@ public abstract class RMContainerRequest
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
-    
+    /**
+     * the time when this request object was formed; can be used to avoid
+     * aggressive preemption for recently placed requests
+     */
+    final long requestTimeMs;
+
     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
       this(event.getAttemptID(), event.getCapability(), event.getHosts(),
           event.getRacks(), priority);
     }
-    
+
+    public ContainerRequest(ContainerRequestEvent event, Priority priority,
+                            long requestTimeMs) {
+      this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+          event.getRacks(), priority, requestTimeMs);
+    }
+
+    public ContainerRequest(TaskAttemptId attemptID,
+                            Resource capability, String[] hosts, String[] racks,
+                            Priority priority) {
+      this(attemptID, capability, hosts, racks, priority,
+          System.currentTimeMillis());
+    }
+
     public ContainerRequest(TaskAttemptId attemptID,
-        Resource capability, String[] hosts, String[] racks, 
-        Priority priority) {
+        Resource capability, String[] hosts, String[] racks,
+        Priority priority, long requestTimeMs) {
       this.attemptID = attemptID;
       this.capability = capability;
       this.hosts = hosts;
       this.racks = racks;
       this.priority = priority;
+      this.requestTimeMs = requestTimeMs;
     }
     
     public String toString() {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sat Jul 12 02:24:40 2014
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.Assert;
 
 
 /**
@@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster {
       int maps, int reduces, boolean autoComplete, String testName,
       boolean cleanOnStart, int startCount, Clock clock, boolean unregistered,
       String assignedQueue) {
-    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
-        .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock,
+        System.currentTimeMillis());
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -573,7 +573,8 @@ public class MRApp extends MRAppMaster {
         Resource resource = Resource.newInstance(1234, 2);
         ContainerTokenIdentifier containerTokenIdentifier =
             new ContainerTokenIdentifier(cId, nodeId.toString(), "user",
-              resource, System.currentTimeMillis() + 10000, 42, 42);
+            resource, System.currentTimeMillis() + 10000, 42, 42,
+            Priority.newInstance(0), 0);
         Token containerToken = newContainerToken(nodeId, "password".getBytes(),
               containerTokenIdentifier);
         Container container = Container.newInstance(cId, nodeId,

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Sat Jul 12 02:24:40 2014
@@ -253,6 +253,12 @@ public class TestJobEndNotifier extends 
     HttpServer2 server = startHttpServer();
     MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false,
         this.getClass().getName(), true, 2, false));
+    // Currently, we will have isLastRetry always equals to false at beginning
+    // of MRAppMaster, except staging area exists or commit already started at 
+    // the beginning.
+    // Now manually set isLastRetry to true and this should reset to false when
+    // unregister failed.
+    app.isLastAMRetry = true;
     doNothing().when(app).sysexit();
     JobConf conf = new JobConf();
     conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
@@ -265,12 +271,11 @@ public class TestJobEndNotifier extends 
     // Now shutdown. User should see FAILED state.
     // Unregistration fails: isLastAMRetry is recalculated, this is
     app.shutDownJob();
-    Assert.assertTrue(app.isLastAMRetry());
-    Assert.assertEquals(1, JobEndServlet.calledTimes);
-    Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED",
-        JobEndServlet.requestUri.getQuery());
-    Assert.assertEquals(JobState.FAILED.toString(),
-      JobEndServlet.foundJobState);
+    Assert.assertFalse(app.isLastAMRetry());
+    // Since it's not last retry, JobEndServlet didn't called
+    Assert.assertEquals(0, JobEndServlet.calledTimes);
+    Assert.assertNull(JobEndServlet.requestUri);
+    Assert.assertNull(JobEndServlet.foundJobState);
     server.stop();
   }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Sat Jul 12 02:24:40 2014
@@ -118,7 +118,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
+            System.currentTimeMillis());
     JobConf conf = new JobConf();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -147,8 +147,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -186,8 +185,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -225,8 +223,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -264,8 +261,7 @@ public class TestMRAppMaster {
     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
     MRAppMaster appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-            System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS,
-            false, false);
+            System.currentTimeMillis(), false, false);
     boolean caught = false;
     try {
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -285,8 +281,9 @@ public class TestMRAppMaster {
   @Test (timeout = 30000)
   public void testMRAppMasterMaxAppAttempts() throws IOException,
       InterruptedException {
-    int[] maxAppAttemtps = new int[] { 1, 2, 3 };
-    Boolean[] expectedBools = new Boolean[]{ true, true, false };
+    // No matter what's the maxAppAttempt or attempt id, the isLastRetry always
+    // equals to false
+    Boolean[] expectedBools = new Boolean[]{ false, false, false };
 
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
     String containerIdStr = "container_1317529182569_0004_000002_1";
@@ -301,10 +298,10 @@ public class TestMRAppMaster {
     File stagingDir =
         new File(MRApps.getStagingAreaDir(conf, userName).toString());
     stagingDir.mkdirs();
-    for (int i = 0; i < maxAppAttemtps.length; ++i) {
+    for (int i = 0; i < expectedBools.length; ++i) {
       MRAppMasterTest appMaster =
           new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-              System.currentTimeMillis(), maxAppAttemtps[i], false, true);
+              System.currentTimeMillis(), false, true);
       MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
       assertEquals("isLastAMRetry is correctly computed.", expectedBools[i],
           appMaster.isLastAMRetry());
@@ -399,7 +396,7 @@ public class TestMRAppMaster {
 
     MRAppMasterTest appMaster =
         new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
-          System.currentTimeMillis(), 1, false, true);
+          System.currentTimeMillis(), false, true);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
 
     // Now validate the task credentials
@@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaste
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, int maxAppAttempts) {
+      long submitTime) {
     this(applicationAttemptId, containerId, host, port, httpPort,
-        submitTime, maxAppAttempts, true, true);
+        submitTime, true, true);
   }
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
-      long submitTime, int maxAppAttempts, boolean overrideInit,
+      long submitTime, boolean overrideInit,
       boolean overrideStart) {
-    super(applicationAttemptId, containerId, host, port, httpPort, submitTime,
-        maxAppAttempts);
+    super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
     this.overrideInit = overrideInit;
     this.overrideStart = overrideStart;
     mockContainerAllocator = mock(ContainerAllocator.class);

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Sat Jul 12 02:24:40 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
@@ -28,9 +29,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
-import org.junit.Assert;
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Assert;
 import org.junit.Test;
 
 
 /**
  * Make sure that the job staging directory clean up happens.
  */
- public class TestStagingCleanup extends TestCase {
+ public class TestStagingCleanup {
    
    private Configuration conf = new Configuration();
    private FileSystem fs;
@@ -81,7 +80,7 @@ import org.junit.Test;
    public void testDeletionofStagingOnUnregistrationFailure()
        throws IOException {
      testDeletionofStagingOnUnregistrationFailure(2, false);
-     testDeletionofStagingOnUnregistrationFailure(1, true);
+     testDeletionofStagingOnUnregistrationFailure(1, false);
    }
 
    @SuppressWarnings("resource")
@@ -104,7 +103,7 @@ import org.junit.Test;
      appMaster.init(conf);
      appMaster.start();
      appMaster.shutDownJob();
-     ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry();
+     ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry();
      if (shouldHaveDeleted) {
        Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry());
        verify(fs).delete(stagingJobPath, true);
@@ -164,7 +163,11 @@ import org.junit.Test;
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
 
-   @Test (timeout = 30000)
+   // FIXME:
+   // Disabled this test because currently, when job state=REBOOT at shutdown 
+   // when lastRetry = true in RM view, cleanup will not do. 
+   // This will be supported after YARN-2261 completed
+//   @Test (timeout = 30000)
    public void testDeletionofStagingOnReboot() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
@@ -202,7 +205,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
      appMaster.init(conf);
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -210,8 +213,12 @@ import org.junit.Test;
      hook.run();
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
-   
-   @Test (timeout = 30000)
+
+  // FIXME:
+  // Disabled this test because currently, when shutdown hook triggered at
+  // lastRetry in RM view, cleanup will not do. This should be supported after
+  // YARN-2261 completed
+//   @Test (timeout = 30000)
    public void testDeletionofStagingOnKillLastTry() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
@@ -226,7 +233,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry
      appMaster.init(conf);
      assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry());
      //simulate the process being killed
@@ -245,10 +252,10 @@ import org.junit.Test;
      boolean crushUnregistration = false;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
-         ContainerAllocator allocator, int maxAppAttempts) {
+         ContainerAllocator allocator) {
        super(applicationAttemptId, ContainerId.newInstance(
            applicationAttemptId, 1), "testhost", 2222, 3333,
-           System.currentTimeMillis(), maxAppAttempts);
+           System.currentTimeMillis());
        this.allocator = allocator;
        this.successfullyUnregistered.set(true);
      }
@@ -256,7 +263,7 @@ import org.junit.Test;
      public TestMRApp(ApplicationAttemptId applicationAttemptId,
          ContainerAllocator allocator, JobStateInternal jobStateInternal,
              int maxAppAttempts) {
-       this(applicationAttemptId, allocator, maxAppAttempts);
+       this(applicationAttemptId, allocator);
        this.jobStateInternal = jobStateInternal;
      }
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Sat Jul 12 02:24:40 2014
@@ -657,6 +657,15 @@ public class TestJobImpl {
     conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
     isUber = testUberDecision(conf);
     Assert.assertFalse(isUber);
+    
+ // enable uber mode of 0 reducer no matter how much memory assigned to reducer
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);  
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);           
+    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+    conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
   }
 
   private boolean testUberDecision(Configuration conf) {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Sat Jul 12 02:24:40 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -795,6 +796,178 @@ public class TestTaskAttempt{
 		finishTime, Long.valueOf(taImpl.getFinishTime()));  
   }
   
+  @Test
+  public void testContainerKillAfterAssigned() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    assertEquals("Task attempt is not in assinged state",
+        taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileRunning() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileCommitPending() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+    assertEquals("Task should be in COMMIT_PENDING state",
+        TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Sat Jul 12 02:24:40 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
@@ -402,7 +403,7 @@ public class TestContainerLauncherImpl {
         1234), "password".getBytes(), new ContainerTokenIdentifier(
         contId, containerManagerAddr, "user",
         Resource.newInstance(1024, 1),
-        currentTime + 10000L, 123, currentTime));
+        currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0));
   }
 
   private static class ContainerManagerForTest implements ContainerManagementProtocol {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1609878&r1=1605891&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Sat Jul 12 02:24:40 2014
@@ -1959,6 +1959,22 @@ public class TestRMContainerAllocator {
     TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
         abortedStatus, attemptId);
     Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+    
+    ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 2);
+    ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
+        ContainerState.RUNNING, "", 0);
+
+    ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2,
+        ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED);
+
+    TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event2.getType());
+
+    TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent(
+        preemptedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType());
   }
   
   @Test

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Sat Jul 12 02:24:40 2014
@@ -168,10 +168,14 @@ public class FileNameIndexUtils {
           decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
 
       try{
-        indexInfo.setJobStartTime(
-          Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+        if (jobDetails.length <= JOB_START_TIME_INDEX) {
+          indexInfo.setJobStartTime(indexInfo.getSubmitTime());
+        } else {
+          indexInfo.setJobStartTime(
+              Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+        }
       } catch (NumberFormatException e){
-        LOG.warn("Unable to parse launch time from job history file "
+        LOG.warn("Unable to parse start time from job history file "
             + jhFileName + " : " + e);
       }
     } catch (IndexOutOfBoundsException e) {

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Sat Jul 12 02:24:40 2014
@@ -39,6 +39,17 @@ public class TestFileNameIndexUtils {
     + FileNameIndexUtils.DELIMITER + "%s"
     + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
 
+  private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+
   private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
     + FileNameIndexUtils.DELIMITER + "%s"
     + FileNameIndexUtils.DELIMITER + "%s"
@@ -236,6 +247,22 @@ public class TestFileNameIndexUtils {
   }
 
   @Test
+  public void testJobStartTimeBackwardsCompatible() throws IOException{
+    String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME,
+        JOB_ID,
+        SUBMIT_TIME,
+        USER_NAME,
+        JOB_NAME_WITH_DELIMITER_ESCAPE,
+        FINISH_TIME,
+        NUM_MAPS,
+        NUM_REDUCES,
+        JOB_STATUS,
+        QUEUE_NAME );
+    JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
+    Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime());
+  }
+
+  @Test
   public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
     JobID oldJobId = JobID.forName(JOB_ID);
     JobId jobId = TypeConverter.toYarn(oldJobId);

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Sat Jul 12 02:24:40 2014
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K,
                                 String[] hosts) {
     return new FileSplit(file, start, length, hosts);
   }
+  
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts, String[] inMemoryHosts) {
+    return new FileSplit(file, start, length, hosts, inMemoryHosts);
+  }
 
   /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K,
 
           long bytesRemaining = length;
           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-            String[] splitHosts = getSplitHosts(blkLocations,
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                 length-bytesRemaining, splitSize, clusterMap);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
-            String[] splitHosts = getSplitHosts(blkLocations, length
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                 - bytesRemaining, bytesRemaining, clusterMap);
             splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
           }
         } else {
-          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
-          splits.add(makeSplit(path, 0, length, splitHosts));
+          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
+          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
         }
       } else { 
         //Create empty hosts array for zero length files
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K,
    * @param blkLocations The list of block locations
    * @param offset 
    * @param splitSize 
-   * @return array of hosts that contribute most to this split
+   * @return an array of hosts that contribute most to this split
    * @throws IOException
    */
   protected String[] getSplitHosts(BlockLocation[] blkLocations, 
+      long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
+    return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
+        clusterMap)[0];
+  }
+  
+  /** 
+   * This function identifies and returns the hosts that contribute 
+   * most for a given split. For calculating the contribution, rack
+   * locality is treated on par with host locality, so hosts from racks
+   * that contribute the most are preferred over hosts on racks that 
+   * contribute less
+   * @param blkLocations The list of block locations
+   * @param offset 
+   * @param splitSize 
+   * @return two arrays - one of hosts that contribute most to this split, and
+   *    one of hosts that contribute most to this split that have the data
+   *    cached on them
+   * @throws IOException
+   */
+  private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
       long offset, long splitSize, NetworkTopology clusterMap)
   throws IOException {
 
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K,
 
     //If this is the only block, just return
     if (bytesInThisBlock >= splitSize) {
-      return blkLocations[startIndex].getHosts();
+      return new String[][] { blkLocations[startIndex].getHosts(),
+          blkLocations[startIndex].getCachedHosts() };
     }
 
     long bytesInFirstBlock = bytesInThisBlock;
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K,
     
     } // for all indices
 
-    return identifyHosts(allTopos.length, racksMap);
+    // We don't yet support cached hosts when bytesInThisBlock > splitSize
+    return new String[][] { identifyHosts(allTopos.length, racksMap),
+        new String[0]};
   }
   
   private String[] identifyHosts(int replicationFactor, 

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Sat Jul 12 02:24:40 2014
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
-                       implements InputSplit {
+                       implements InputSplitWithLocationInfo {
   org.apache.hadoop.mapreduce.lib.input.FileSplit fs; 
   protected FileSplit() {
     fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@@ -62,6 +63,20 @@ public class FileSplit extends org.apach
            length, hosts);
   }
   
+  /** Constructs a split with host information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block, possibly null
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+          length, hosts, inMemoryHosts);
+ }
+  
   public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
     this.fs = fs;
   }
@@ -92,4 +107,9 @@ public class FileSplit extends org.apach
     return fs.getLocations();
   }
   
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return fs.getLocationInfo();
+  }
 }

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Sat Jul 12 02:24:40 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -51,10 +53,25 @@ public abstract class InputSplit {
   /**
    * Get the list of nodes by name where the data for the split would be local.
    * The locations do not need to be serialized.
+   * 
    * @return a new array of the node nodes.
    * @throws IOException
    * @throws InterruptedException
    */
   public abstract 
     String[] getLocations() throws IOException, InterruptedException;
+  
+  /**
+   * Gets info about which nodes the input split is stored on and how it is
+   * stored at each location.
+   * 
+   * @return list of <code>SplitLocationInfo</code>s describing how the split
+   *    data is stored at each location. A null value indicates that all the
+   *    locations have the data stored on disk.
+   * @throws IOException
+   */
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return null;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1609878&r1=1609877&r2=1609878&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sat Jul 12 02:24:40 2014
@@ -579,7 +579,17 @@ public interface MRJobConfig {
       MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
   public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
       50;
-  
+
+  /**
+   * The threshold in terms of seconds after which an unsatisfied mapper request
+   * triggers reducer preemption to free space. Default 0 implies that the reduces
+   * should be preempted immediately after allocation if there is currently no
+   * room for newly allocated mappers.
+   */
+  public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
+      "mapreduce.job.reducer.preempt.delay.sec";
+  public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
+
   public static final String MR_AM_ENV =
       MR_AM_PREFIX + "env";