You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ar...@apache.org on 2013/12/19 03:04:42 UTC

svn commit: r1552205 - in /hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java...

Author: arp
Date: Thu Dec 19 02:03:47 2013
New Revision: 1552205

URL: http://svn.apache.org/r1552205
Log:
Merge forward from trunk to branch HDFS-2832

Added:
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
      - copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
      - copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
      - copied from r1552204, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/
Modified:
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
    hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1549949-1552204

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt Thu Dec 19 02:03:47 2013
@@ -71,6 +71,12 @@ Trunk (Unreleased)
     MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.
     (Srikanth Sundarrajan via amareshwari)
 
+    MAPREDUCE-5197. Add a service for checkpointing task state.
+    (Carlo Curino via cdouglas)
+
+    MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
+    from YARN. (Carlo Curino via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -237,6 +243,15 @@ Release 2.4.0 - UNRELEASED
     MAPREDUCE-5656. bzip2 codec can drop records when reading data in splits
     (jlowe)
 
+    MAPREDUCE-5623. TestJobCleanup fails because of RejectedExecutionException
+    and NPE. (jlowe)
+
+    MAPREDUCE-5679. TestJobHistoryParsing has race condition (Liyin Liang via
+    jlowe)
+
+    MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He
+    via vinodkv)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -290,6 +305,9 @@ Release 2.3.0 - UNRELEASED
     event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera
     Shegalov via jlowe)
 
+    MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
+    (Chuan Liu via cnauroth)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1549949-1552204

Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1535792-1536571,1536573-1552204

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Dec 19 02:03:47 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl ext
       .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
   
   private JobTokenSecretManager jobTokenSecretManager = null;
+  private AMPreemptionPolicy preemptionPolicy;
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager,
-      RMHeartbeatHandler rmHeartbeatHandler) {
+      RMHeartbeatHandler rmHeartbeatHandler,
+      AMPreemptionPolicy preemptionPolicy) {
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
+    this.preemptionPolicy = preemptionPolicy;
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Dec 19 02:03:47 2013
@@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -188,8 +190,8 @@ public class MRAppMaster extends Composi
   private ContainerLauncher containerLauncher;
   private EventHandler<CommitterEvent> committerEventHandler;
   private Speculator speculator;
-  private TaskAttemptListener taskAttemptListener;
-  private JobTokenSecretManager jobTokenSecretManager =
+  protected TaskAttemptListener taskAttemptListener;
+  protected JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
@@ -197,6 +199,7 @@ public class MRAppMaster extends Composi
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
+  private AMPreemptionPolicy preemptionPolicy;
 
   private Job job;
   private Credentials jobCredentials = new Credentials(); // Filled during init
@@ -383,8 +386,12 @@ public class MRAppMaster extends Composi
       committerEventHandler = createCommitterEventHandler(context, committer);
       addIfService(committerEventHandler);
 
+      //policy handling preemption requests from RM
+      preemptionPolicy = createPreemptionPolicy(conf);
+      preemptionPolicy.init(context);
+
       //service to handle requests to TaskUmbilicalProtocol
-      taskAttemptListener = createTaskAttemptListener(context);
+      taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
       addIfService(taskAttemptListener);
 
       //service to log job history events
@@ -475,6 +482,12 @@ public class MRAppMaster extends Composi
     return committer;
   }
 
+  protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
+    return ReflectionUtils.newInstance(conf.getClass(
+          MRJobConfig.MR_AM_PREEMPTION_POLICY,
+          NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
+  }
+
   protected boolean keepJobFiles(JobConf conf) {
     return (conf.getKeepTaskFilesPattern() != null || conf
         .getKeepFailedTaskFiles());
@@ -692,10 +705,11 @@ public class MRAppMaster extends Composi
     }
   }
 
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
-            getRMHeartbeatHandler());
+            getRMHeartbeatHandler(), preemptionPolicy);
     return lis;
   }
 
@@ -805,7 +819,7 @@ public class MRAppMaster extends Composi
             , containerID);
       } else {
         this.containerAllocator = new RMContainerAllocator(
-            this.clientService, this.context);
+            this.clientService, this.context, preemptionPolicy);
       }
       ((Service)this.containerAllocator).init(getConfig());
       ((Service)this.containerAllocator).start();

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Dec 19 02:03:47 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -147,13 +149,17 @@ public class RMContainerAllocator extend
   private long retryInterval;
   private long retrystartTime;
 
+  private final AMPreemptionPolicy preemptionPolicy;
+
   BlockingQueue<ContainerAllocatorEvent> eventQueue
     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
 
   private ScheduleStats scheduleStats = new ScheduleStats();
 
-  public RMContainerAllocator(ClientService clientService, AppContext context) {
+  public RMContainerAllocator(ClientService clientService, AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     super(clientService, context);
+    this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
   }
 
@@ -361,11 +367,15 @@ public class RMContainerAllocator extend
         LOG.error("Could not deallocate container for task attemptId " + 
             aId);
       }
+      preemptionPolicy.handleCompletedContainer(event.getAttemptID());
     } else if (
         event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
       ContainerFailedEvent fEv = (ContainerFailedEvent) event;
       String host = getHost(fEv.getContMgrAddress());
       containerFailedOnHost(host);
+      // propagate failures to preemption policy to discard checkpoints for
+      // failed tasks
+      preemptionPolicy.handleFailedContainer(event.getAttemptID());
     }
   }
 
@@ -399,7 +409,7 @@ public class RMContainerAllocator extend
         }
         scheduledRequests.reduces.clear();
         
-        //preempt for making space for atleast one map
+        //preempt for making space for at least one map
         int premeptionLimit = Math.max(mapResourceReqt, 
             (int) (maxReducePreemptionLimit * memLimit));
         
@@ -409,7 +419,7 @@ public class RMContainerAllocator extend
         int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
         toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
         
-        LOG.info("Going to preempt " + toPreempt);
+        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
         assignedRequests.preemptReduce(toPreempt);
       }
     }
@@ -595,6 +605,14 @@ public class RMContainerAllocator extend
     }
     
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+
+    // propagate preemption requests
+    final PreemptionMessage preemptReq = response.getPreemptionMessage();
+    if (preemptReq != null) {
+      preemptionPolicy.preempt(
+          new PreemptionContext(assignedRequests), preemptReq);
+    }
+
     if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
       //something changed
       recalculateReduceSchedule = true;
@@ -630,7 +648,9 @@ public class RMContainerAllocator extend
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
-      }      
+
+        preemptionPolicy.handleCompletedContainer(attemptID);
+      }
     }
     return newContainers;
   }
@@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend
         " RackLocal:" + rackLocalAssigned);
     }
   }
+
+  static class PreemptionContext extends AMPreemptionPolicy.Context {
+    final AssignedRequests reqs;
+
+    PreemptionContext(AssignedRequests reqs) {
+      this.reqs = reqs;
+    }
+    @Override
+    public TaskAttemptId getTaskAttempt(ContainerId container) {
+      return reqs.get(container);
+    }
+
+    @Override
+    public List<Container> getContainers(TaskType t){
+      if(TaskType.REDUCE.equals(t))
+        return new ArrayList<Container>(reqs.reduces.values());
+      if(TaskType.MAP.equals(t))
+        return new ArrayList<Container>(reqs.maps.values());
+      return null;
+    }
+
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Thu Dec 19 02:03:47 2013
@@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Dec 19 02:03:47 2013
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(
+      AppContext context, AMPreemptionPolicy policy) {
     return new TaskAttemptListener(){
       @Override
       public InetSocketAddress getAddress() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Thu Dec 19 02:03:47 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -61,6 +63,8 @@ public class MRAppBenchmark {
 
   /**
    * Runs memory and time benchmark with Mock MRApp.
+   * @param app Application to submit
+   * @throws Exception On application failure
    */
   public void run(MRApp app) throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -133,6 +137,7 @@ public class MRAppBenchmark {
       protected void serviceStart() throws Exception {
         thread = new Thread(new Runnable() {
           @Override
+          @SuppressWarnings("unchecked")
           public void run() {
             ContainerAllocatorEvent event = null;
             while (!Thread.currentThread().isInterrupted()) {
@@ -192,7 +197,9 @@ public class MRAppBenchmark {
       @Override
       protected ContainerAllocator createContainerAllocator(
           ClientService clientService, AppContext context) {
-        return new RMContainerAllocator(clientService, context) {
+
+        AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
+        return new RMContainerAllocator(clientService, context, policy) {
           @Override
           protected ApplicationMasterProtocol createSchedulerProxy() {
             return new ApplicationMasterProtocol() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Thu Dec 19 02:03:47 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -247,13 +248,14 @@ public class TestFail {
       super(maps, reduces, false, "TimeOutTaskMRApp", true);
     }
     @Override
-    protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+    protected TaskAttemptListener createTaskAttemptListener(
+        AppContext context, AMPreemptionPolicy policy) {
       //This will create the TaskAttemptListener with TaskHeartbeatHandler
       //RPC servers are not started
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null, null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null, policy) {
         @Override
         public void startRpcServer(){};
         @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Dec 19 02:03:47 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
+
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator {
     // Use this constructor when using a real job.
     MyContainerAllocator(MyResourceManager rm,
         ApplicationAttemptId appAttemptId, AppContext context) {
-      super(createMockClientService(), context);
+      super(createMockClientService(), context, new NoopAMPreemptionPolicy());
       this.rm = rm;
     }
 
     // Use this constructor when you are using a mocked job.
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job) {
-      super(createMockClientService(), createAppContext(appAttemptId, job));
+      super(createMockClientService(), createAppContext(appAttemptId, job),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator {
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job, Clock clock) {
       super(createMockClientService(),
-          createAppContext(appAttemptId, job, clock));
+          createAppContext(appAttemptId, job, clock),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator {
         ApplicationId.newInstance(1, 1));
 
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), appContext) {
+        mock(ClientService.class), appContext,
+        new NoopAMPreemptionPolicy()) {
           @Override
           protected void register() {
           }
@@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator {
   @Test
   public void testCompletedContainerEvent() {
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), mock(AppContext.class));
+        mock(ClientService.class), mock(AppContext.class),
+        new NoopAMPreemptionPolicy());
     
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Thu Dec 19 02:03:47 2013
@@ -300,6 +300,8 @@ public class TypeConverter {
             .getCleanupProgress(), fromYarn(jobreport.getJobState()),
         jobPriority, jobreport.getUser(), jobreport.getJobName(), jobreport
             .getJobFile(), trackingUrl, jobreport.isUber());
+    jobStatus.setStartTime(jobreport.getStartTime());
+    jobStatus.setFinishTime(jobreport.getFinishTime());
     jobStatus.setFailureInfo(jobreport.getDiagnostics());
     return jobStatus;
   }
@@ -441,6 +443,7 @@ public class TypeConverter {
       );
     jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
     jobStatus.setStartTime(application.getStartTime());
+    jobStatus.setFinishTime(application.getFinishTime());
     jobStatus.setFailureInfo(application.getDiagnostics());
     jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
     jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Thu Dec 19 02:03:47 2013
@@ -27,6 +27,8 @@ import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -71,6 +73,7 @@ public class TestTypeConverter {
   @Test
   public void testFromYarn() throws Exception {
     int appStartTime = 612354;
+    int appFinishTime = 612355;
     YarnApplicationState state = YarnApplicationState.RUNNING;
     ApplicationId applicationId = ApplicationId.newInstance(0, 0);
     ApplicationReport applicationReport = Records
@@ -78,6 +81,7 @@ public class TestTypeConverter {
     applicationReport.setApplicationId(applicationId);
     applicationReport.setYarnApplicationState(state);
     applicationReport.setStartTime(appStartTime);
+    applicationReport.setFinishTime(appFinishTime);
     applicationReport.setUser("TestTypeConverter-user");
     ApplicationResourceUsageReport appUsageRpt = Records
         .newRecord(ApplicationResourceUsageReport.class);
@@ -91,6 +95,7 @@ public class TestTypeConverter {
     applicationReport.setApplicationResourceUsageReport(appUsageRpt);
     JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
     Assert.assertEquals(appStartTime, jobStatus.getStartTime());
+    Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());    
     Assert.assertEquals(state.toString(), jobStatus.getState().toString());
   }
 
@@ -172,4 +177,25 @@ public class TestTypeConverter {
     Assert.assertEquals("QueueInfo children weren't properly converted",
       returned.getQueueChildren().size(), 1);
   }
+
+  @Test
+  public void testFromYarnJobReport() throws Exception {
+    int jobStartTime = 612354;
+    int jobFinishTime = 612355;
+    JobState state = JobState.RUNNING;
+    JobId jobId = Records.newRecord(JobId.class);
+    JobReport jobReport = Records.newRecord(JobReport.class);
+    ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+    jobId.setAppId(applicationId);
+    jobId.setId(0);    
+    jobReport.setJobId(jobId);
+    jobReport.setJobState(state);
+    jobReport.setStartTime(jobStartTime);
+    jobReport.setFinishTime(jobFinishTime);
+    jobReport.setUser("TestTypeConverter-user");    
+    JobStatus jobStatus = TypeConverter.fromYarn(jobReport, "dummy-jobfile");
+    Assert.assertEquals(jobStartTime, jobStatus.getStartTime());
+    Assert.assertEquals(jobFinishTime, jobStatus.getFinishTime());    
+    Assert.assertEquals(state.toString(), jobStatus.getState().toString());
+  }  
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Thu Dec 19 02:03:47 2013
@@ -45,5 +45,9 @@ public enum JobCounter {
   TOTAL_LAUNCHED_UBERTASKS,
   NUM_UBER_SUBMAPS,
   NUM_UBER_SUBREDUCES,
-  NUM_FAILED_UBERTASKS
+  NUM_FAILED_UBERTASKS,
+  TASKS_REQ_PREEMPT,
+  CHECKPOINTS,
+  CHECKPOINT_BYTES,
+  CHECKPOINT_TIME
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Dec 19 02:03:47 2013
@@ -459,7 +459,13 @@ public interface MRJobConfig {
   public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
     MR_AM_PREFIX  + "job.reduce.preemption.limit";
   public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
-  
+
+  /**
+   * Policy class encoding responses to preemption requests.
+   */
+  public static final String MR_AM_PREEMPTION_POLICY =
+    MR_AM_PREFIX + "preemption.policy";
+
   /** AM ACL disabled. **/
   public static final String JOB_AM_ACCESS_DISABLED = 
     "mapreduce.job.am-access-disabled";
@@ -708,4 +714,7 @@ public interface MRJobConfig {
   
   public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
   
+  public static final String TASK_PREEMPTION =
+      "mapreduce.job.preemption";
+
 }

Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1542123-1552204

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Thu Dec 19 02:03:47 2013
@@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name=            Total
 SLOTS_MILLIS_REDUCES.name=         Total time spent by all reduces in occupied slots (ms)
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)
+TASKS_REQ_PREEMPT.name=            Tasks that have been asked to preempt
+CHECKPOINTS.name=                  Number of checkpoints reported
+CHECKPOINT_BYTES.name=             Total amount of bytes in checkpoints
+CHECKPOINT_TIME.name=              Total time spent checkpointing (ms)
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Thu Dec 19 02:03:47 2013
@@ -339,8 +339,11 @@ public class TestJobHistoryParsing {
     PrintStream stdps = System.out;
     try {
       System.setOut(new PrintStream(outContent));
-      HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
-          fileInfo.getHistoryFile()).toString(), conf, true);
+      HistoryViewer viewer;
+      synchronized (fileInfo) {
+        viewer = new HistoryViewer(fc.makeQualified(
+            fileInfo.getHistoryFile()).toString(), conf, true);
+      }
       viewer.print();
 
       for (TaskInfo taskInfo : allTasks.values()) {
@@ -397,29 +400,27 @@ public class TestJobHistoryParsing {
       // make sure all events are flushed
       app.waitForState(Service.STATE.STOPPED);
 
-      String jobhistoryDir = JobHistoryUtils
-          .getHistoryIntermediateDoneDirForUser(conf);
       JobHistory jobHistory = new JobHistory();
       jobHistory.init(conf);
+      HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+      
+      JobHistoryParser parser;
+      JobInfo jobInfo;
+      synchronized (fileInfo) {
+        Path historyFilePath = fileInfo.getHistoryFile();
+        FSDataInputStream in = null;
+        FileContext fc = null;
+        try {
+          fc = FileContext.getFileContext(conf);
+          in = fc.open(fc.makeQualified(historyFilePath));
+        } catch (IOException ioe) {
+          LOG.info("Can not open history file: " + historyFilePath, ioe);
+          throw (new Exception("Can not open History File"));
+        }
 
-      JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
-          .getJobIndexInfo();
-      String jobhistoryFileName = FileNameIndexUtils
-          .getDoneFileName(jobIndexInfo);
-
-      Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
-      FSDataInputStream in = null;
-      FileContext fc = null;
-      try {
-        fc = FileContext.getFileContext(conf);
-        in = fc.open(fc.makeQualified(historyFilePath));
-      } catch (IOException ioe) {
-        LOG.info("Can not open history file: " + historyFilePath, ioe);
-        throw (new Exception("Can not open History File"));
+        parser = new JobHistoryParser(in);
+        jobInfo = parser.parse();
       }
-
-      JobHistoryParser parser = new JobHistoryParser(in);
-      JobInfo jobInfo = parser.parse();
       Exception parseException = parser.getParseException();
       Assert.assertNull("Caught an expected exception " + parseException,
           parseException);
@@ -464,29 +465,28 @@ public class TestJobHistoryParsing {
       // make sure all events are flushed
       app.waitForState(Service.STATE.STOPPED);
 
-      String jobhistoryDir = JobHistoryUtils
-          .getHistoryIntermediateDoneDirForUser(conf);
       JobHistory jobHistory = new JobHistory();
       jobHistory.init(conf);
 
-      JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
-          .getJobIndexInfo();
-      String jobhistoryFileName = FileNameIndexUtils
-          .getDoneFileName(jobIndexInfo);
-
-      Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
-      FSDataInputStream in = null;
-      FileContext fc = null;
-      try {
-        fc = FileContext.getFileContext(conf);
-        in = fc.open(fc.makeQualified(historyFilePath));
-      } catch (IOException ioe) {
-        LOG.info("Can not open history file: " + historyFilePath, ioe);
-        throw (new Exception("Can not open History File"));
-      }
+      HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
+      
+      JobHistoryParser parser;
+      JobInfo jobInfo;
+      synchronized (fileInfo) {
+        Path historyFilePath = fileInfo.getHistoryFile();
+        FSDataInputStream in = null;
+        FileContext fc = null;
+        try {
+          fc = FileContext.getFileContext(conf);
+          in = fc.open(fc.makeQualified(historyFilePath));
+        } catch (IOException ioe) {
+          LOG.info("Can not open history file: " + historyFilePath, ioe);
+          throw (new Exception("Can not open History File"));
+        }
 
-      JobHistoryParser parser = new JobHistoryParser(in);
-      JobInfo jobInfo = parser.parse();
+        parser = new JobHistoryParser(in);
+        jobInfo = parser.parse();
+      }
       Exception parseException = parser.getParseException();
       Assert.assertNull("Caught an expected exception " + parseException,
           parseException);

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Dec 19 02:03:47 2013
@@ -304,7 +304,7 @@ public class TestClientRedirect {
     @Override
     public KillApplicationResponse forceKillApplication(
         KillApplicationRequest request) throws IOException {
-      return recordFactory.newRecordInstance(KillApplicationResponse.class);
+      return KillApplicationResponse.newInstance(true);
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCleanup.java Thu Dec 19 02:03:47 2013
@@ -195,8 +195,7 @@ public class TestJobCleanup {
     RunningJob job = jobClient.submitJob(jc);
     JobID id = job.getID();
     job.waitForCompletion();
-    Counters counters = job.getCounters();
-    assertTrue("No. of failed maps should be 1",counters.getCounter(JobCounter.NUM_FAILED_MAPS) == 1);
+    assertEquals("Job did not fail", JobStatus.FAILED, job.getJobState());
 
     if (fileName != null) {
       Path testFile = new Path(outDir, fileName);
@@ -242,9 +241,7 @@ public class TestJobCleanup {
     job.killJob(); // kill the job
 
     job.waitForCompletion(); // wait for the job to complete
-    
-    counters = job.getCounters();
-    assertTrue("No. of killed maps should be 1", counters.getCounter(JobCounter.NUM_KILLED_MAPS) == 1);
+    assertEquals("Job was not killed", JobStatus.KILLED, job.getJobState());
 
     if (fileName != null) {
       Path testFile = new Path(outDir, fileName);

Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1552205&r1=1552204&r2=1552205&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Thu Dec 19 02:03:47 2013
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -208,7 +209,7 @@ public class TestYARNRunner extends Test
     };
     /* make sure kill calls finish application master */
     when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
-    .thenReturn(null);
+    .thenReturn(KillApplicationResponse.newInstance(true));
     delegate.killApplication(appId);
     verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));