You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2013/12/17 23:54:32 UTC

svn commit: r1551748 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...

Author: cdouglas
Date: Tue Dec 17 22:54:31 2013
New Revision: 1551748

URL: http://svn.apache.org/r1551748
Log:
MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
from YARN. Contributed by Carlo Curino.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java   (with props)
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 17 22:54:31 2013
@@ -74,6 +74,9 @@ Trunk (Unreleased)
     MAPREDUCE-5197. Add a service for checkpointing task state.
     (Carlo Curino via cdouglas)
 
+    MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
+    from YARN. (Carlo Curino via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Dec 17 22:54:31 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl ext
       .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
   
   private JobTokenSecretManager jobTokenSecretManager = null;
+  private AMPreemptionPolicy preemptionPolicy;
   
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager,
-      RMHeartbeatHandler rmHeartbeatHandler) {
+      RMHeartbeatHandler rmHeartbeatHandler,
+      AMPreemptionPolicy preemptionPolicy) {
     super(TaskAttemptListenerImpl.class.getName());
     this.context = context;
     this.jobTokenSecretManager = jobTokenSecretManager;
     this.rmHeartbeatHandler = rmHeartbeatHandler;
+    this.preemptionPolicy = preemptionPolicy;
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Dec 17 22:54:31 2013
@@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
 import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
@@ -188,8 +190,8 @@ public class MRAppMaster extends Composi
   private ContainerLauncher containerLauncher;
   private EventHandler<CommitterEvent> committerEventHandler;
   private Speculator speculator;
-  private TaskAttemptListener taskAttemptListener;
-  private JobTokenSecretManager jobTokenSecretManager =
+  protected TaskAttemptListener taskAttemptListener;
+  protected JobTokenSecretManager jobTokenSecretManager =
       new JobTokenSecretManager();
   private JobId jobId;
   private boolean newApiCommitter;
@@ -197,6 +199,7 @@ public class MRAppMaster extends Composi
   private JobEventDispatcher jobEventDispatcher;
   private JobHistoryEventHandler jobHistoryEventHandler;
   private SpeculatorEventDispatcher speculatorEventDispatcher;
+  private AMPreemptionPolicy preemptionPolicy;
 
   private Job job;
   private Credentials jobCredentials = new Credentials(); // Filled during init
@@ -383,8 +386,12 @@ public class MRAppMaster extends Composi
       committerEventHandler = createCommitterEventHandler(context, committer);
       addIfService(committerEventHandler);
 
+      //policy handling preemption requests from RM
+      preemptionPolicy = createPreemptionPolicy(conf);
+      preemptionPolicy.init(context);
+
       //service to handle requests to TaskUmbilicalProtocol
-      taskAttemptListener = createTaskAttemptListener(context);
+      taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy);
       addIfService(taskAttemptListener);
 
       //service to log job history events
@@ -475,6 +482,12 @@ public class MRAppMaster extends Composi
     return committer;
   }
 
+  protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) {
+    return ReflectionUtils.newInstance(conf.getClass(
+          MRJobConfig.MR_AM_PREEMPTION_POLICY,
+          NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf);
+  }
+
   protected boolean keepJobFiles(JobConf conf) {
     return (conf.getKeepTaskFilesPattern() != null || conf
         .getKeepFailedTaskFiles());
@@ -692,10 +705,11 @@ public class MRAppMaster extends Composi
     }
   }
 
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     TaskAttemptListener lis =
         new TaskAttemptListenerImpl(context, jobTokenSecretManager,
-            getRMHeartbeatHandler());
+            getRMHeartbeatHandler(), preemptionPolicy);
     return lis;
   }
 
@@ -805,7 +819,7 @@ public class MRAppMaster extends Composi
             , containerID);
       } else {
         this.containerAllocator = new RMContainerAllocator(
-            this.clientService, this.context);
+            this.clientService, this.context, preemptionPolicy);
       }
       ((Service)this.containerAllocator).init(getConfig());
       ((Service)this.containerAllocator).start();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Dec 17 22:54:31 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -147,13 +149,17 @@ public class RMContainerAllocator extend
   private long retryInterval;
   private long retrystartTime;
 
+  private final AMPreemptionPolicy preemptionPolicy;
+
   BlockingQueue<ContainerAllocatorEvent> eventQueue
     = new LinkedBlockingQueue<ContainerAllocatorEvent>();
 
   private ScheduleStats scheduleStats = new ScheduleStats();
 
-  public RMContainerAllocator(ClientService clientService, AppContext context) {
+  public RMContainerAllocator(ClientService clientService, AppContext context,
+      AMPreemptionPolicy preemptionPolicy) {
     super(clientService, context);
+    this.preemptionPolicy = preemptionPolicy;
     this.stopped = new AtomicBoolean(false);
   }
 
@@ -361,11 +367,15 @@ public class RMContainerAllocator extend
         LOG.error("Could not deallocate container for task attemptId " + 
             aId);
       }
+      preemptionPolicy.handleCompletedContainer(event.getAttemptID());
     } else if (
         event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
       ContainerFailedEvent fEv = (ContainerFailedEvent) event;
       String host = getHost(fEv.getContMgrAddress());
       containerFailedOnHost(host);
+      // propagate failures to preemption policy to discard checkpoints for
+      // failed tasks
+      preemptionPolicy.handleFailedContainer(event.getAttemptID());
     }
   }
 
@@ -399,7 +409,7 @@ public class RMContainerAllocator extend
         }
         scheduledRequests.reduces.clear();
         
-        //preempt for making space for atleast one map
+        //preempt for making space for at least one map
         int premeptionLimit = Math.max(mapResourceReqt, 
             (int) (maxReducePreemptionLimit * memLimit));
         
@@ -409,7 +419,7 @@ public class RMContainerAllocator extend
         int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
         toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
         
-        LOG.info("Going to preempt " + toPreempt);
+        LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
         assignedRequests.preemptReduce(toPreempt);
       }
     }
@@ -595,6 +605,14 @@ public class RMContainerAllocator extend
     }
     
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+
+    // propagate preemption requests
+    final PreemptionMessage preemptReq = response.getPreemptionMessage();
+    if (preemptReq != null) {
+      preemptionPolicy.preempt(
+          new PreemptionContext(assignedRequests), preemptReq);
+    }
+
     if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
       //something changed
       recalculateReduceSchedule = true;
@@ -630,7 +648,9 @@ public class RMContainerAllocator extend
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
-      }      
+
+        preemptionPolicy.handleCompletedContainer(attemptID);
+      }
     }
     return newContainers;
   }
@@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend
         " RackLocal:" + rackLocalAssigned);
     }
   }
+
+  static class PreemptionContext extends AMPreemptionPolicy.Context {
+    final AssignedRequests reqs;
+
+    PreemptionContext(AssignedRequests reqs) {
+      this.reqs = reqs;
+    }
+    @Override
+    public TaskAttemptId getTaskAttempt(ContainerId container) {
+      return reqs.get(container);
+    }
+
+    @Override
+    public List<Container> getContainers(TaskType t){
+      if(TaskType.REDUCE.equals(t))
+        return new ArrayList<Container>(reqs.reduces.values());
+      if(TaskType.MAP.equals(t))
+        return new ArrayList<Container>(reqs.maps.values());
+      return null;
+    }
+
+  }
+
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1551748&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Tue Dec 17 22:54:31 2013
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
+
+import java.util.List;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+
+/**
+ * Policy encoding the {@link org.apache.hadoop.mapreduce.v2.app.MRAppMaster}
+ * response to preemption requests from the ResourceManager.
+ * @see org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator
+ */
+public interface AMPreemptionPolicy {
+
+  public abstract class Context {
+
+    /**
+     * @param container ID of container to preempt
+     * @return Task associated with the running container or <code>null</code>
+     * if no task is bound to that container.
+     */
+    public abstract TaskAttemptId getTaskAttempt(ContainerId container);
+
+    /**
+     * Method provides the complete list of containers running task of type t
+     * for this AM.
+     * @param t the type of containers
+     * @return a map containing
+     */
+    public abstract List<Container> getContainers(TaskType t);
+
+  }
+
+  public void init(AppContext context);
+
+  /**
+   * Callback informing the policy of ResourceManager. requests for resources
+   * to return to the cluster. The policy may take arbitrary action to satisfy
+   * requests by checkpointing task state, returning containers, or ignoring
+   * requests. The RM may elect to enforce these requests by forcibly killing
+   * containers not returned after some duration.
+   * @param context Handle to the current state of running containers
+   * @param preemptionRequests Request from RM for resources to return.
+   */
+  public void preempt(Context context, PreemptionMessage preemptionRequests);
+
+  /**
+   * This method is invoked by components interested to learn whether a certain
+   * task is being preempted.
+   * @param attemptID Task attempt to query
+   * @return true if this attempt is being preempted
+   */
+  public boolean isPreempted(TaskAttemptId attemptID);
+
+  /**
+   * This method is used to report to the policy that a certain task has been
+   * successfully preempted (for bookeeping, counters, etc..)
+   * @param attemptID Task attempt that preempted
+   */
+  public void reportSuccessfulPreemption(TaskAttemptID attemptID);
+
+  /**
+   * Callback informing the policy of containers exiting with a failure. This
+   * allows the policy to implemnt cleanup/compensating actions.
+   * @param attemptID Task attempt that failed
+   */
+  public void handleFailedContainer(TaskAttemptId attemptID);
+
+  /**
+   * Callback informing the policy of containers exiting cleanly. This is
+   * reported to the policy for bookeeping purposes.
+   * @param attemptID Task attempt that completed
+   */
+  public void handleCompletedContainer(TaskAttemptId attemptID);
+
+  /**
+   * Method to retrieve the latest checkpoint for a given {@link TaskID}
+   * @param taskId TaskID
+   * @return CheckpointID associated with this task or null
+   */
+  public TaskCheckpointID getCheckpointID(TaskID taskId);
+
+  /**
+   * Method to store the latest {@link
+   * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
+   * TaskID}. Assigning a null is akin to remove all previous checkpoints for
+   * this task.
+   * @param taskId TaskID
+   * @param cid Checkpoint to assign or <tt>null</tt> to remove it.
+   */
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1551748&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Dec 17 22:54:31 2013
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * Sample policy that aggressively kills tasks when requested.
+ */
+public class KillAMPreemptionPolicy implements AMPreemptionPolicy {
+
+  private static final Log LOG =
+      LogFactory.getLog(KillAMPreemptionPolicy.class);
+
+  @SuppressWarnings("rawtypes")
+  private EventHandler dispatcher = null;
+
+  @Override
+  public void init(AppContext context) {
+    dispatcher = context.getEventHandler();
+  }
+
+  @Override
+  public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
+    // for both strict and negotiable preemption requests kill the
+    // container
+    for (PreemptionContainer c :
+        preemptionRequests.getStrictContract().getContainers()) {
+      killContainer(ctxt, c);
+    }
+    for (PreemptionContainer c :
+         preemptionRequests.getContract().getContainers()) {
+       killContainer(ctxt, c);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void killContainer(Context ctxt, PreemptionContainer c){
+    ContainerId reqCont = c.getId();
+    TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont);
+    LOG.info("Evicting " + reqTask);
+    dispatcher.handle(new TaskAttemptEvent(reqTask,
+        TaskAttemptEventType.TA_KILL));
+
+    // add preemption to counters
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(reqTask
+            .getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1);
+        dispatcher.handle(jce);
+  }
+
+  @Override
+  public void handleFailedContainer(TaskAttemptId attemptID) {
+    // ignore
+  }
+
+  @Override
+  public boolean isPreempted(TaskAttemptId yarnAttemptID) {
+    return false;
+  }
+
+  @Override
+  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+    // ignore
+  }
+
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+    return null;
+  }
+
+  @Override
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+    // ignore
+  }
+
+  @Override
+  public void handleCompletedContainer(TaskAttemptId attemptID) {
+    // ignore
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1551748&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Tue Dec 17 22:54:31 2013
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
+
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+
+/**
+ * NoOp policy that ignores all the requests for preemption.
+ */
+public class NoopAMPreemptionPolicy implements AMPreemptionPolicy {
+
+  @Override
+  public void init(AppContext context){
+   // do nothing
+  }
+
+  @Override
+  public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
+    // do nothing, ignore all requeusts
+  }
+
+  @Override
+  public void handleFailedContainer(TaskAttemptId attemptID) {
+    // do nothing
+  }
+
+  @Override
+  public boolean isPreempted(TaskAttemptId yarnAttemptID) {
+    return false;
+  }
+
+  @Override
+  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+    // ignore
+  }
+
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+    return null;
+  }
+
+  @Override
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+    // ignore
+  }
+
+  @Override
+  public void handleCompletedContainer(TaskAttemptId attemptID) {
+    // ignore
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Dec 17 22:54:31 2013
@@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
         TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler);
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
     TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) {
+        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Dec 17 22:54:31 2013
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
@@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+  protected TaskAttemptListener createTaskAttemptListener(
+      AppContext context, AMPreemptionPolicy policy) {
     return new TaskAttemptListener(){
       @Override
       public InetSocketAddress getAddress() {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue Dec 17 22:54:31 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -61,6 +63,8 @@ public class MRAppBenchmark {
 
   /**
    * Runs memory and time benchmark with Mock MRApp.
+   * @param app Application to submit
+   * @throws Exception On application failure
    */
   public void run(MRApp app) throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -133,6 +137,7 @@ public class MRAppBenchmark {
       protected void serviceStart() throws Exception {
         thread = new Thread(new Runnable() {
           @Override
+          @SuppressWarnings("unchecked")
           public void run() {
             ContainerAllocatorEvent event = null;
             while (!Thread.currentThread().isInterrupted()) {
@@ -192,7 +197,9 @@ public class MRAppBenchmark {
       @Override
       protected ContainerAllocator createContainerAllocator(
           ClientService clientService, AppContext context) {
-        return new RMContainerAllocator(clientService, context) {
+
+        AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
+        return new RMContainerAllocator(clientService, context, policy) {
           @Override
           protected ApplicationMasterProtocol createSchedulerProxy() {
             return new ApplicationMasterProtocol() {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Dec 17 22:54:31 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -247,13 +248,14 @@ public class TestFail {
       super(maps, reduces, false, "TimeOutTaskMRApp", true);
     }
     @Override
-    protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
+    protected TaskAttemptListener createTaskAttemptListener(
+        AppContext context, AMPreemptionPolicy policy) {
       //This will create the TaskAttemptListener with TaskHeartbeatHandler
       //RPC servers are not started
       //task time out is reduced
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
-      return new TaskAttemptListenerImpl(getContext(), null, null) {
+      return new TaskAttemptListenerImpl(getContext(), null, null, policy) {
         @Override
         public void startRpcServer(){};
         @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 17 22:54:31 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
+
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator {
     // Use this constructor when using a real job.
     MyContainerAllocator(MyResourceManager rm,
         ApplicationAttemptId appAttemptId, AppContext context) {
-      super(createMockClientService(), context);
+      super(createMockClientService(), context, new NoopAMPreemptionPolicy());
       this.rm = rm;
     }
 
     // Use this constructor when you are using a mocked job.
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job) {
-      super(createMockClientService(), createAppContext(appAttemptId, job));
+      super(createMockClientService(), createAppContext(appAttemptId, job),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator {
     public MyContainerAllocator(MyResourceManager rm, Configuration conf,
         ApplicationAttemptId appAttemptId, Job job, Clock clock) {
       super(createMockClientService(),
-          createAppContext(appAttemptId, job, clock));
+          createAppContext(appAttemptId, job, clock),
+          new NoopAMPreemptionPolicy());
       this.rm = rm;
       super.init(conf);
       super.start();
@@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator {
         ApplicationId.newInstance(1, 1));
 
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), appContext) {
+        mock(ClientService.class), appContext,
+        new NoopAMPreemptionPolicy()) {
           @Override
           protected void register() {
           }
@@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator {
   @Test
   public void testCompletedContainerEvent() {
     RMContainerAllocator allocator = new RMContainerAllocator(
-        mock(ClientService.class), mock(AppContext.class));
+        mock(ClientService.class), mock(AppContext.class),
+        new NoopAMPreemptionPolicy());
     
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
         MRBuilderUtils.newTaskId(

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Tue Dec 17 22:54:31 2013
@@ -45,5 +45,9 @@ public enum JobCounter {
   TOTAL_LAUNCHED_UBERTASKS,
   NUM_UBER_SUBMAPS,
   NUM_UBER_SUBREDUCES,
-  NUM_FAILED_UBERTASKS
+  NUM_FAILED_UBERTASKS,
+  TASKS_REQ_PREEMPT,
+  CHECKPOINTS,
+  CHECKPOINT_BYTES,
+  CHECKPOINT_TIME
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 17 22:54:31 2013
@@ -459,7 +459,13 @@ public interface MRJobConfig {
   public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
     MR_AM_PREFIX  + "job.reduce.preemption.limit";
   public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
-  
+
+  /**
+   * Policy class encoding responses to preemption requests.
+   */
+  public static final String MR_AM_PREEMPTION_POLICY =
+    MR_AM_PREFIX + "preemption.policy";
+
   /** AM ACL disabled. **/
   public static final String JOB_AM_ACCESS_DISABLED = 
     "mapreduce.job.am-access-disabled";
@@ -708,4 +714,7 @@ public interface MRJobConfig {
   
   public static final String MR_APPLICATION_TYPE = "MAPREDUCE";
   
+  public static final String TASK_PREEMPTION =
+      "mapreduce.job.preemption";
+
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java?rev=1551748&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java Tue Dec 17 22:54:31 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.checkpoint;
+
+public enum EnumCounter {
+  INPUTKEY,
+  INPUTVALUE,
+  OUTPUTRECORDS,
+  CHECKPOINT_BYTES,
+  CHECKPOINT_MS
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1551748&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Tue Dec 17 22:54:31 2013
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.checkpoint;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.Counters;
+
+/**
+ * Implementation of CheckpointID used in MR. It contains a reference to an
+ * underlying FileSsytem based checkpoint, and various metadata about the
+ * cost of checkpoints and other counters. This is sent by the task to the AM
+ * to be stored and provided to the next execution of the same task.
+ */
+public class TaskCheckpointID implements CheckpointID{
+
+  FSCheckpointID rawId;
+  private List<Path> partialOutput;
+  private Counters counters;
+
+  public TaskCheckpointID() {
+    this.rawId = new FSCheckpointID();
+    this.partialOutput = new ArrayList<Path>();
+  }
+
+  public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
+          Counters counters) {
+    this.rawId = rawId;
+    this.counters = counters;
+    if(partialOutput == null)
+      this.partialOutput = new ArrayList<Path>();
+    else
+      this.partialOutput = partialOutput;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    counters.write(out);
+    if (partialOutput == null) {
+      WritableUtils.writeVLong(out, 0L);
+    } else {
+      WritableUtils.writeVLong(out, partialOutput.size());
+      for(Path p:partialOutput){
+        Text.writeString(out, p.toString());
+      }
+    }
+    rawId.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    partialOutput.clear();
+    counters.readFields(in);
+    long numPout = WritableUtils.readVLong(in);
+    for(int i=0;i<numPout;i++)
+      partialOutput.add(new Path(Text.readString(in)));
+    rawId.readFields(in);
+  }
+
+  @Override
+  public boolean equals(Object other){
+    if (other instanceof TaskCheckpointID){
+      return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
+             this.counters.equals(((TaskCheckpointID) other).counters) &&
+             this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
+             ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return rawId.hashCode();
+  }
+
+  /**
+   * @return the size of the checkpoint in bytes
+   */
+  public long getCheckpointBytes() {
+    return counters.findCounter(EnumCounter.CHECKPOINT_BYTES).getValue();
+  }
+
+  /**
+   * @return how long it took to take this checkpoint
+   */
+  public long getCheckpointTime() {
+    return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
+  }
+
+  public String toString(){
+    return rawId.toString() + " counters:" + counters;
+
+  }
+
+  public List<Path> getPartialCommittedOutput() {
+    return partialOutput;
+  }
+
+  public Counters getCounters() {
+    return counters;
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1551748&r1=1551747&r2=1551748&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Tue Dec 17 22:54:31 2013
@@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name=            Total
 SLOTS_MILLIS_REDUCES.name=         Total time spent by all reduces in occupied slots (ms)
 FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
 FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)
+TASKS_REQ_PREEMPT.name=            Tasks that have been asked to preempt
+CHECKPOINTS.name=                  Number of checkpoints reported
+CHECKPOINT_BYTES.name=             Total amount of bytes in checkpoints
+CHECKPOINT_TIME.name=              Total time spent checkpointing (ms)
\ No newline at end of file