You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2013/12/28 22:58:33 UTC

svn commit: r1553939 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/h...

Author: cdouglas
Date: Sat Dec 28 21:58:33 2013
New Revision: 1553939

URL: http://svn.apache.org/r1553939
Log:
MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
Contributed by Carlo Curino

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java   (with props)
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 28 21:58:33 2013
@@ -77,6 +77,9 @@ Trunk (Unreleased)
     MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
     from YARN. (Carlo Curino via cdouglas)
 
+    MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
+    (Carlo Curino via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Sat Dec 28 21:58:33 2013
@@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -45,8 +47,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -229,6 +231,22 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
+  public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
+          throws IOException, InterruptedException {
+    LOG.info("Preempted state update from " + taskAttemptID.toString());
+    // An attempt is telling us that it got preempted.
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+        TypeConverter.toYarn(taskAttemptID);
+
+    preemptionPolicy.reportSuccessfulPreemption(attemptID);
+    taskHeartbeatHandler.progressing(attemptID);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(attemptID,
+            TaskAttemptEventType.TA_PREEMPTED));
+  }
+
+  @Override
   public void done(TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("Done acknowledgement from " + taskAttemptID.toString());
 
@@ -250,6 +268,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -264,6 +286,10 @@ public class TaskAttemptListenerImpl ext
 
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    // handling checkpoints
+    preemptionPolicy.handleFailedContainer(attemptID);
+
     context.getEventHandler().handle(
         new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
   }
@@ -294,12 +320,6 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
-    LOG.info("Ping from " + taskAttemptID.toString());
-    return true;
-  }
-
-  @Override
   public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
  throws IOException {
     diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
@@ -321,11 +341,33 @@ public class TaskAttemptListenerImpl ext
   }
 
   @Override
-  public boolean statusUpdate(TaskAttemptID taskAttemptID,
+  public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
       TaskStatus taskStatus) throws IOException, InterruptedException {
-    LOG.info("Status update from " + taskAttemptID.toString());
+
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
         TypeConverter.toYarn(taskAttemptID);
+
+    AMFeedback feedback = new AMFeedback();
+    feedback.setTaskFound(true);
+
+    // Propagating preemption to the task if TASK_PREEMPTION is enabled
+    if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
+        && preemptionPolicy.isPreempted(yarnAttemptID)) {
+      feedback.setPreemption(true);
+      LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+          + " of type " + yarnAttemptID.getTaskId().getTaskType());
+    }
+
+    if (taskStatus == null) {
+      //We are using statusUpdate only as a simple ping
+      LOG.info("Ping from " + taskAttemptID.toString());
+      taskHeartbeatHandler.progressing(yarnAttemptID);
+      return feedback;
+    }
+
+    // if we are here there is an actual status update to be processed
+    LOG.info("Status update from " + taskAttemptID.toString());
+
     taskHeartbeatHandler.progressing(yarnAttemptID);
     TaskAttemptStatus taskAttemptStatus =
         new TaskAttemptStatus();
@@ -386,7 +428,7 @@ public class TaskAttemptListenerImpl ext
     context.getEventHandler().handle(
         new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
             taskAttemptStatus));
-    return true;
+    return feedback;
   }
 
   @Override
@@ -494,4 +536,18 @@ public class TaskAttemptListenerImpl ext
     return ProtocolSignature.getProtocolSignature(this, 
         protocol, clientVersion, clientMethodsHash);
   }
+
+  // task checkpoint bookeeping
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    return preemptionPolicy.getCheckpointID(tid);
+  }
+
+  @Override
+  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+    TaskId tid = TypeConverter.toYarn(taskId);
+    preemptionPolicy.setCheckpointID(tid, cid);
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Sat Dec 28 21:58:33 2013
@@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
   TA_FAILMSG,
   TA_UPDATE,
   TA_TIMED_OUT,
+  TA_PREEMPTED,
 
   //Producer:TaskCleaner
   TA_CLEANUP_DONE,

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sat Dec 28 21:58:33 2013
@@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.RUNNING,
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
          CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptStateInternal.RUNNING,
+         TaskAttemptStateInternal.KILLED,
+         TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
 
      // Transitions from COMMIT_PENDING state
      .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
+             TaskAttemptEventType.TA_PREEMPTED,
              // Container launch events can arrive late
              TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -1874,6 +1878,27 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private static class PreemptedTransition implements
+      SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      taskAttempt.setFinishTime();
+      taskAttempt.taskAttemptListener.unregister(
+          taskAttempt.attemptId, taskAttempt.jvmID);
+      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+          taskAttempt.attemptId,
+          taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
+          taskAttempt.container.getContainerToken(),
+          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_KILLED));
+
+    }
+  }
+
   private static class CleanupContainerTransition implements
        SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @SuppressWarnings("unchecked")

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Dec 28 21:58:33 2013
@@ -347,7 +347,7 @@ public class RMContainerAllocator extend
       }
       
     } else if (
-        event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+      event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
   
       LOG.info("Processing the event " + event.toString());
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -19,10 +19,9 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.List;
 
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -81,7 +80,7 @@ public interface AMPreemptionPolicy {
    * successfully preempted (for bookeeping, counters, etc..)
    * @param attemptID Task attempt that preempted
    */
-  public void reportSuccessfulPreemption(TaskAttemptID attemptID);
+  public void reportSuccessfulPreemption(TaskAttemptId attemptID);
 
   /**
    * Callback informing the policy of containers exiting with a failure. This
@@ -98,20 +97,20 @@ public interface AMPreemptionPolicy {
   public void handleCompletedContainer(TaskAttemptId attemptID);
 
   /**
-   * Method to retrieve the latest checkpoint for a given {@link TaskID}
+   * Method to retrieve the latest checkpoint for a given {@link TaskId}
    * @param taskId TaskID
    * @return CheckpointID associated with this task or null
    */
-  public TaskCheckpointID getCheckpointID(TaskID taskId);
+  public TaskCheckpointID getCheckpointID(TaskId taskId);
 
   /**
    * Method to store the latest {@link
    * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
-   * TaskID}. Assigning a null is akin to remove all previous checkpoints for
+   * TaskId}. Assigning a null is akin to remove all previous checkpoints for
    * this task.
    * @param taskId TaskID
    * @param cid Checkpoint to assign or <tt>null</tt> to remove it.
    */
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid);
 
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * This policy works in combination with an implementation of task
+ * checkpointing. It computes the tasks to be preempted in response to the RM
+ * request for preemption. For strict requests, it maps containers to
+ * corresponding tasks; for fungible requests, it attempts to pick the best
+ * containers to preempt (reducers in reverse allocation order). The
+ * TaskAttemptListener will interrogate this policy when handling a task
+ * heartbeat to check whether the task should be preempted or not. When handling
+ * fungible requests, the policy discount the RM ask by the amount of currently
+ * in-flight preemptions (i.e., tasks that are checkpointing).
+ *
+ * This class it is also used to maintain the list of checkpoints for existing
+ * tasks. Centralizing this functionality here, allows us to have visibility on
+ * preemption and checkpoints in a single location, thus coordinating preemption
+ * and checkpoint management decisions in a single policy.
+ */
+public class CheckpointAMPreemptionPolicy implements AMPreemptionPolicy {
+
+  // task attempts flagged for preemption
+  private final Set<TaskAttemptId> toBePreempted;
+
+  private final Set<TaskAttemptId> countedPreemptions;
+
+  private final Map<TaskId,TaskCheckpointID> checkpoints;
+
+  private final Map<TaskAttemptId,Resource> pendingFlexiblePreemptions;
+
+  @SuppressWarnings("rawtypes")
+  private EventHandler eventHandler;
+
+  static final Log LOG = LogFactory
+      .getLog(CheckpointAMPreemptionPolicy.class);
+
+  public CheckpointAMPreemptionPolicy() {
+    this(Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
+         Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
+         Collections.synchronizedMap(new HashMap<TaskId,TaskCheckpointID>()),
+         Collections.synchronizedMap(new HashMap<TaskAttemptId,Resource>()));
+  }
+
+  CheckpointAMPreemptionPolicy(Set<TaskAttemptId> toBePreempted,
+      Set<TaskAttemptId> countedPreemptions,
+      Map<TaskId,TaskCheckpointID> checkpoints,
+      Map<TaskAttemptId,Resource> pendingFlexiblePreemptions) {
+    this.toBePreempted = toBePreempted;
+    this.countedPreemptions = countedPreemptions;
+    this.checkpoints = checkpoints;
+    this.pendingFlexiblePreemptions = pendingFlexiblePreemptions;
+  }
+
+  @Override
+  public void init(AppContext context) {
+    this.eventHandler = context.getEventHandler();
+  }
+
+  @Override
+  public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
+
+    if (preemptionRequests != null) {
+
+      // handling non-negotiable preemption
+
+      StrictPreemptionContract cStrict = preemptionRequests.getStrictContract();
+      if (cStrict != null
+          && cStrict.getContainers() != null
+          && cStrict.getContainers().size() > 0) {
+        LOG.info("strict preemption :" +
+            preemptionRequests.getStrictContract().getContainers().size() +
+            " containers to kill");
+
+        // handle strict preemptions. These containers are non-negotiable
+        for (PreemptionContainer c :
+            preemptionRequests.getStrictContract().getContainers()) {
+          ContainerId reqCont = c.getId();
+          TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont);
+          if (reqTask != null) {
+            // ignore requests for preempting containers running maps
+            if (org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE
+                .equals(reqTask.getTaskId().getTaskType())) {
+              toBePreempted.add(reqTask);
+              LOG.info("preempting " + reqCont + " running task:" + reqTask);
+            } else {
+              LOG.info("NOT preempting " + reqCont + " running task:" + reqTask);
+            }
+          }
+        }
+      }
+
+      // handling negotiable preemption
+      PreemptionContract cNegot = preemptionRequests.getContract();
+      if (cNegot != null
+          && cNegot.getResourceRequest() != null
+          && cNegot.getResourceRequest().size() > 0
+          && cNegot.getContainers() != null
+          && cNegot.getContainers().size() > 0) {
+
+        LOG.info("negotiable preemption :" +
+            preemptionRequests.getContract().getResourceRequest().size() +
+            " resourceReq, " +
+            preemptionRequests.getContract().getContainers().size() +
+            " containers");
+        // handle fungible preemption. Here we only look at the total amount of
+        // resources to be preempted and pick enough of our containers to
+        // satisfy that. We only support checkpointing for reducers for now.
+        List<PreemptionResourceRequest> reqResources =
+          preemptionRequests.getContract().getResourceRequest();
+
+        // compute the total amount of pending preemptions (to be discounted
+        // from current request)
+        int pendingPreemptionRam = 0;
+        int pendingPreemptionCores = 0;
+        for (Resource r : pendingFlexiblePreemptions.values()) {
+          pendingPreemptionRam += r.getMemory();
+          pendingPreemptionCores += r.getVirtualCores();
+        }
+
+        // discount preemption request based on currently pending preemption
+        for (PreemptionResourceRequest rr : reqResources) {
+          ResourceRequest reqRsrc = rr.getResourceRequest();
+          if (!ResourceRequest.ANY.equals(reqRsrc.getResourceName())) {
+            // For now, only respond to aggregate requests and ignore locality
+            continue;
+          }
+
+          LOG.info("ResourceRequest:" + reqRsrc);
+          int reqCont = reqRsrc.getNumContainers();
+          int reqMem = reqRsrc.getCapability().getMemory();
+          int totalMemoryToRelease = reqCont * reqMem;
+          int reqCores = reqRsrc.getCapability().getVirtualCores();
+          int totalCoresToRelease = reqCont * reqCores;
+
+          // remove
+          if (pendingPreemptionRam > 0) {
+            // if goes negative we simply exit
+            totalMemoryToRelease -= pendingPreemptionRam;
+            // decrement pending resources if zero or negatve we will
+            // ignore it while processing next PreemptionResourceRequest
+            pendingPreemptionRam -= totalMemoryToRelease;
+          }
+          if (pendingPreemptionCores > 0) {
+            totalCoresToRelease -= pendingPreemptionCores;
+            pendingPreemptionCores -= totalCoresToRelease;
+          }
+
+          // reverse order of allocation (for now)
+          List<Container> listOfCont = ctxt.getContainers(TaskType.REDUCE);
+          Collections.sort(listOfCont, new Comparator<Container>() {
+            @Override
+            public int compare(final Container o1, final Container o2) {
+              return o2.getId().getId() - o1.getId().getId();
+            }
+          });
+
+          // preempt reducers first
+          for (Container cont : listOfCont) {
+            if (totalMemoryToRelease <= 0 && totalCoresToRelease<=0) {
+              break;
+            }
+            TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId());
+            int cMem = cont.getResource().getMemory();
+            int cCores = cont.getResource().getVirtualCores();
+
+            if (!toBePreempted.contains(reduceId)) {
+              totalMemoryToRelease -= cMem;
+              totalCoresToRelease -= cCores;
+                toBePreempted.add(reduceId);
+                pendingFlexiblePreemptions.put(reduceId, cont.getResource());
+            }
+            LOG.info("ResourceRequest:" + reqRsrc + " satisfied preempting "
+                + reduceId);
+          }
+          // if map was preemptable we would do add them to toBePreempted here
+        }
+      }
+    }
+  }
+
+  @Override
+  public void handleFailedContainer(TaskAttemptId attemptID) {
+    toBePreempted.remove(attemptID);
+    checkpoints.remove(attemptID.getTaskId());
+  }
+
+  @Override
+  public void handleCompletedContainer(TaskAttemptId attemptID){
+    LOG.info(" task completed:" + attemptID);
+    toBePreempted.remove(attemptID);
+    pendingFlexiblePreemptions.remove(attemptID);
+  }
+
+  @Override
+  public boolean isPreempted(TaskAttemptId yarnAttemptID) {
+    if (toBePreempted.contains(yarnAttemptID)) {
+      updatePreemptionCounters(yarnAttemptID);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
+    // ignore
+  }
+
+  @Override
+  public TaskCheckpointID getCheckpointID(TaskId taskId) {
+    return checkpoints.get(taskId);
+  }
+
+  @Override
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
+    checkpoints.put(taskId, cid);
+    if (cid != null) {
+      updateCheckpointCounters(taskId, cid);
+    }
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID cid) {
+    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+    jce.addCounterUpdate(JobCounter.CHECKPOINTS, 1);
+    eventHandler.handle(jce);
+    jce = new JobCounterUpdateEvent(taskId.getJobId());
+    jce.addCounterUpdate(JobCounter.CHECKPOINT_BYTES, cid.getCheckpointBytes());
+    eventHandler.handle(jce);
+    jce = new JobCounterUpdateEvent(taskId.getJobId());
+    jce.addCounterUpdate(JobCounter.CHECKPOINT_TIME, cid.getCheckpointTime());
+    eventHandler.handle(jce);
+
+  }
+
+  @SuppressWarnings({ "unchecked" })
+  private void updatePreemptionCounters(TaskAttemptId yarnAttemptID) {
+    if (!countedPreemptions.contains(yarnAttemptID)) {
+      countedPreemptions.add(yarnAttemptID);
+      JobCounterUpdateEvent jce = new JobCounterUpdateEvent(yarnAttemptID
+          .getTaskId().getJobId());
+      jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1);
+      eventHandler.handle(jce);
+    }
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -19,11 +19,10 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -89,17 +88,17 @@ public class KillAMPreemptionPolicy impl
   }
 
   @Override
-  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+  public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
     // ignore
   }
 
   @Override
-  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+  public TaskCheckpointID getCheckpointID(TaskId taskId) {
     return null;
   }
 
   @Override
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
     // ignore
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -17,10 +17,9 @@
 */
 package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
 
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 
@@ -50,17 +49,17 @@ public class NoopAMPreemptionPolicy impl
   }
 
   @Override
-  public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+  public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
     // ignore
   }
 
   @Override
-  public TaskCheckpointID getCheckpointID(TaskID taskId) {
+  public TaskCheckpointID getCheckpointID(TaskId taskId) {
     return null;
   }
 
   @Override
-  public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+  public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
     // ignore
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Sat Dec 28 21:58:33 2013
@@ -17,26 +17,23 @@
 */
 package org.apache.hadoop.mapred;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-
-import junit.framework.Assert;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.SystemClock;
+
 import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestTaskAttemptListenerImpl {
-  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl
+      extends TaskAttemptListenerImpl {
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
         RMHeartbeatHandler rmHeartbeatHandler,
-        TaskHeartbeatHandler hbHandler) {
-      super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
+        TaskHeartbeatHandler hbHandler,
+        AMPreemptionPolicy policy) {
+
+      super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
       this.taskHeartbeatHandler = hbHandler;
     }
     
@@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
     MockTaskAttemptListenerImpl listener = 
       new MockTaskAttemptListenerImpl(appCtx, secret,
-          rmHeartbeatHandler, hbHandler);
+          rmHeartbeatHandler, hbHandler, policy);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl
     assertNotNull(jvmid);
     try {
       JVMId.forName("jvm_001_002_m_004_006");
-      Assert.fail();
+      fail();
     } catch (IllegalArgumentException e) {
       assertEquals(e.getMessage(),
           "TaskId string : jvm_001_002_m_004_006 is not properly formed");
@@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl
         isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
             : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
-    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
     TaskAttemptCompletionEvent tce = recordFactory
         .newRecordInstance(TaskAttemptCompletionEvent.class);
     tce.setEventId(eventId);
@@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl
     RMHeartbeatHandler rmHeartbeatHandler =
         mock(RMHeartbeatHandler.class);
     final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
-    TaskAttemptListenerImpl listener =
-        new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
       @Override
       protected void registerHeartbeatHandler(Configuration conf) {
         taskHeartbeatHandler = hbHandler;
@@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl
 
     listener.stop();
   }
+
+  @Test
+  public void testCheckpointIDTracking()
+    throws IOException, InterruptedException{
+
+    SystemClock clock = new SystemClock();
+
+    org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+        mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+    when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+
+    Dispatcher dispatcher = mock(Dispatcher.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(dispatcher.getEventHandler()).thenReturn(ea);
+
+    RMHeartbeatHandler rmHeartbeatHandler =
+        mock(RMHeartbeatHandler.class);
+
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    when(appCtx.getClock()).thenReturn(clock);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    when(appCtx.getEventHandler()).thenReturn(ea);
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(appCtx);
+    TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+        appCtx, secret, rmHeartbeatHandler, policy) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
+    //conf.setBoolean("preemption.reduce", true);
+
+    listener.init(conf);
+    listener.start();
+
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+
+    List<Path> partialOut = new ArrayList<Path>();
+    partialOut.add(new Path("/prev1"));
+    partialOut.add(new Path("/prev2"));
+
+    Counters counters = mock(Counters.class);
+    final long CBYTES = 64L * 1024 * 1024;
+    final long CTIME = 4344L;
+    final Path CLOC = new Path("/test/1");
+    Counter cbytes = mock(Counter.class);
+    when(cbytes.getValue()).thenReturn(CBYTES);
+    Counter ctime = mock(Counter.class);
+    when(ctime.getValue()).thenReturn(CTIME);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
+        .thenReturn(cbytes);
+    when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
+        .thenReturn(ctime);
+
+    // propagating a taskstatus that contains a checkpoint id
+    TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
+          CLOC), partialOut, counters);
+    listener.setCheckpointID(
+        org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
+
+    // and try to get it back
+    CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
+    TaskCheckpointID tcid = (TaskCheckpointID) outcid;
+    assertEquals(CBYTES, tcid.getCheckpointBytes());
+    assertEquals(CTIME, tcid.getCheckpointTime());
+    assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
+    assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
+
+    //assert it worked
+    assert outcid == incid;
+
+    listener.stop();
+
+  }
+
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCheckpointPreemptionPolicy {
+
+  TaskAttemptListenerImpl pel= null;
+  RMContainerAllocator r;
+  JobId jid;
+  RunningAppContext mActxt;
+  Set<ContainerId> preemptedContainers = new HashSet<ContainerId>();
+  Map<ContainerId,TaskAttemptId> assignedContainers =
+      new HashMap<ContainerId, TaskAttemptId>();
+  private final RecordFactory recordFactory =
+        RecordFactoryProvider.getRecordFactory(null);
+  HashMap<ContainerId,Resource> contToResourceMap =
+    new HashMap<ContainerId, Resource>();
+
+  private int minAlloc = 1024;
+
+  @Before
+  @SuppressWarnings("rawtypes") // mocked generics
+  public void setup() {
+    ApplicationId appId = ApplicationId.newInstance(200, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    jid = MRBuilderUtils.newJobId(appId, 1);
+
+    mActxt = mock(RunningAppContext.class);
+    EventHandler ea = mock(EventHandler.class);
+    when(mActxt.getEventHandler()).thenReturn(ea);
+    for (int i = 0; i < 40; ++i) {
+      ContainerId cId = ContainerId.newInstance(appAttemptId, i);
+      if (0 == i % 7) {
+        preemptedContainers.add(cId);
+      }
+      TaskId tId = 0 == i % 2
+          ? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP)
+          : MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE);
+      assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0));
+      contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2));
+    }
+
+    for (Map.Entry<ContainerId,TaskAttemptId> ent :
+         assignedContainers.entrySet()) {
+      System.out.println("cont:" + ent.getKey().getId() +
+          " type:" + ent.getValue().getTaskId().getTaskType() +
+          " res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" );
+    }
+  }
+
+  @Test
+  public void testStrictPreemptionContract() {
+
+    final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
+    AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
+      @Override
+      public TaskAttemptId getTaskAttempt(ContainerId cId) {
+        return containers.get(cId);
+      }
+      @Override
+      public List<Container> getContainers(TaskType t) {
+        List<Container> p = new ArrayList<Container>();
+        for (Map.Entry<ContainerId,TaskAttemptId> ent :
+            assignedContainers.entrySet()) {
+          if (ent.getValue().getTaskId().getTaskType().equals(t)) {
+            p.add(Container.newInstance(ent.getKey(), null, null,
+                contToResourceMap.get(ent.getKey()),
+                Priority.newInstance(0), null));
+          }
+        }
+        return p;
+      }
+    };
+
+    PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
+        contToResourceMap, Resource.newInstance(1024, 1), true);
+
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(mActxt);
+    policy.preempt(mPctxt, pM);
+
+
+    for (ContainerId c : preemptedContainers) {
+      TaskAttemptId t = assignedContainers.get(c);
+      if (TaskType.MAP.equals(t.getTaskId().getTaskType())) {
+        assert policy.isPreempted(t) == false;
+      } else {
+        assert policy.isPreempted(t);
+      }
+    }
+  }
+
+
+  @Test
+  public void testPreemptionContract() {
+    final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
+    AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
+      @Override
+      public TaskAttemptId getTaskAttempt(ContainerId cId) {
+        return containers.get(cId);
+      }
+
+      @Override
+      public List<Container> getContainers(TaskType t) {
+        List<Container> p = new ArrayList<Container>();
+        for (Map.Entry<ContainerId,TaskAttemptId> ent :
+            assignedContainers.entrySet()){
+          if(ent.getValue().getTaskId().getTaskType().equals(t)){
+            p.add(Container.newInstance(ent.getKey(), null, null,
+                contToResourceMap.get(ent.getKey()),
+                Priority.newInstance(0), null));
+          }
+        }
+        return p;
+      }
+    };
+
+    PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
+        contToResourceMap, Resource.newInstance(minAlloc, 1), false);
+
+    CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+    policy.init(mActxt);
+
+    int supposedMemPreemption = pM.getContract().getResourceRequest()
+        .get(0).getResourceRequest().getCapability().getMemory()
+        * pM.getContract().getResourceRequest().get(0).getResourceRequest()
+        .getNumContainers();
+
+    // first round of preemption
+    policy.preempt(mPctxt, pM);
+    List<TaskAttemptId> preempting =
+      validatePreemption(pM, policy, supposedMemPreemption);
+
+    // redundant message
+    policy.preempt(mPctxt, pM);
+    List<TaskAttemptId> preempting2 =
+      validatePreemption(pM, policy, supposedMemPreemption);
+
+    // check that nothing got added
+    assert preempting2.equals(preempting);
+
+    // simulate 2 task completions/successful preemption
+    policy.handleCompletedContainer(preempting.get(0));
+    policy.handleCompletedContainer(preempting.get(1));
+
+    // remove from assignedContainers
+    Iterator<Map.Entry<ContainerId,TaskAttemptId>> it =
+      assignedContainers.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<ContainerId,TaskAttemptId> ent = it.next();
+      if (ent.getValue().equals(preempting.get(0)) ||
+        ent.getValue().equals(preempting.get(1)))
+        it.remove();
+    }
+
+    // one more message asking for preemption
+    policy.preempt(mPctxt, pM);
+
+    // triggers preemption of 2 more containers (i.e., the preemption set changes)
+    List<TaskAttemptId> preempting3 =
+      validatePreemption(pM, policy, supposedMemPreemption);
+    assert preempting3.equals(preempting2) == false;
+  }
+
+  private List<TaskAttemptId> validatePreemption(PreemptionMessage pM,
+    CheckpointAMPreemptionPolicy policy, int supposedMemPreemption) {
+    Resource effectivelyPreempted = Resource.newInstance(0, 0);
+
+    List<TaskAttemptId> preempting = new ArrayList<TaskAttemptId>();
+
+    for (Map.Entry<ContainerId, TaskAttemptId> ent :
+        assignedContainers.entrySet()) {
+      if (policy.isPreempted(ent.getValue())) {
+        Resources.addTo(effectivelyPreempted,contToResourceMap.get(ent.getKey()));
+        // preempt only reducers
+        if (policy.isPreempted(ent.getValue())){
+          assertEquals(TaskType.REDUCE, ent.getValue().getTaskId().getTaskType());
+          preempting.add(ent.getValue());
+        }
+      }
+    }
+
+    // preempt enough
+    assert (effectivelyPreempted.getMemory() >= supposedMemPreemption)
+      : " preempted: " + effectivelyPreempted.getMemory();
+
+    // preempt not too much enough
+    assert effectivelyPreempted.getMemory() <= supposedMemPreemption + minAlloc;
+    return preempting;
+  }
+
+  private PreemptionMessage generatePreemptionMessage(
+      Set<ContainerId> containerToPreempt,
+      HashMap<ContainerId, Resource> resPerCont,
+      Resource minimumAllocation, boolean strict) {
+
+    Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+        new HashSet<ContainerId>(containerToPreempt));
+    containerToPreempt.clear();
+    Resource tot = Resource.newInstance(0, 0);
+    for(ContainerId c : currentContPreemption){
+      Resources.addTo(tot,
+          resPerCont.get(c));
+    }
+    int numCont = (int) Math.ceil(tot.getMemory() /
+              (double) minimumAllocation.getMemory());
+    ResourceRequest rr = ResourceRequest.newInstance(
+        Priority.newInstance(0), ResourceRequest.ANY,
+        minimumAllocation, numCont);
+    if (strict) {
+      return generatePreemptionMessage(new Allocation(null, null,
+                  currentContPreemption, null, null));
+    }
+    return generatePreemptionMessage(new Allocation(null, null,
+                          null, currentContPreemption,
+                          Collections.singletonList(rr)));
+  }
+
+
+  private PreemptionMessage generatePreemptionMessage(Allocation allocation) {
+    PreemptionMessage pMsg = null;
+    // assemble strict preemption request
+    if (allocation.getStrictContainerPreemptions() != null) {
+       pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
+      StrictPreemptionContract pStrict =
+          recordFactory.newRecordInstance(StrictPreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      pStrict.setContainers(pCont);
+      pMsg.setStrictContract(pStrict);
+    }
+
+    // assemble negotiable preemption request
+    if (allocation.getResourcePreemptions() != null &&
+        allocation.getResourcePreemptions().size() > 0 &&
+        allocation.getContainerPreemptions() != null &&
+        allocation.getContainerPreemptions().size() > 0) {
+      if (pMsg == null) {
+        pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
+      }
+      PreemptionContract contract =
+          recordFactory.newRecordInstance(PreemptionContract.class);
+      Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+      for (ContainerId cId : allocation.getContainerPreemptions()) {
+        PreemptionContainer pc =
+            recordFactory.newRecordInstance(PreemptionContainer.class);
+        pc.setId(cId);
+        pCont.add(pc);
+      }
+      List<PreemptionResourceRequest> pRes =
+        new ArrayList<PreemptionResourceRequest>();
+      for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+        PreemptionResourceRequest prr =
+            recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+        prr.setResourceRequest(crr);
+        pRes.add(prr);
+      }
+      contract.setContainers(pCont);
+      contract.setResourceRequest(pRes);
+      pMsg.setContract(contract);
+    }
+    return pMsg;
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Dec 28 21:58:33 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.Queue
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -575,10 +576,17 @@ public class LocalJobRunner implements C
 
     // TaskUmbilicalProtocol methods
 
+    @Override
     public JvmTask getTask(JvmContext context) { return null; }
     
-    public synchronized boolean statusUpdate(TaskAttemptID taskId,
+    @Override
+    public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
         TaskStatus taskStatus) throws IOException, InterruptedException {
+      AMFeedback feedback = new AMFeedback();
+      feedback.setTaskFound(true);
+      if (null == taskStatus) {
+        return feedback;
+      }
       // Serialize as we would if distributed in order to make deep copy
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       DataOutputStream dos = new DataOutputStream(baos);
@@ -618,7 +626,7 @@ public class LocalJobRunner implements C
       }
 
       // ignore phase
-      return true;
+      return feedback;
     }
 
     /** Return the current values of the counters for this job,
@@ -654,24 +662,24 @@ public class LocalJobRunner implements C
       statusUpdate(taskid, taskStatus);
     }
 
+    @Override
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
       // Ignore for now
     }
     
+    @Override
     public void reportNextRecordRange(TaskAttemptID taskid, 
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
 
-    public boolean ping(TaskAttemptID taskid) throws IOException {
-      return true;
-    }
-    
+    @Override
     public boolean canCommit(TaskAttemptID taskid) 
     throws IOException {
       return true;
     }
     
+    @Override
     public void done(TaskAttemptID taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
@@ -681,11 +689,13 @@ public class LocalJobRunner implements C
       }
     }
 
+    @Override
     public synchronized void fsError(TaskAttemptID taskId, String message) 
     throws IOException {
       LOG.fatal("FSError: "+ message + "from task: " + taskId);
     }
 
+    @Override
     public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
@@ -695,12 +705,30 @@ public class LocalJobRunner implements C
       LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
     }
     
+    @Override
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
       return new MapTaskCompletionEventsUpdate(
         org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
     }
-    
+
+    @Override
+    public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      // ignore
+    }
+
+    @Override
+    public TaskCheckpointID getCheckpointID(TaskID taskId) {
+      // ignore
+      return null;
+    }
+
+    @Override
+    public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+      // ignore
+    }
+
   }
 
   public LocalJobRunner(Configuration conf) throws IOException {

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class is a simple struct to include both the taskFound information and
+ * a possible preemption request coming from the AM.
+ */
+public class AMFeedback implements Writable {
+
+  boolean taskFound;
+  boolean preemption;
+
+  public void setTaskFound(boolean t){
+    taskFound=t;
+  }
+
+  public boolean getTaskFound(){
+    return taskFound;
+  }
+
+  public void setPreemption(boolean preemption) {
+    this.preemption=preemption;
+  }
+
+  public boolean getPreemption() {
+    return preemption;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(taskFound);
+    out.writeBoolean(preemption);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskFound = in.readBoolean();
+    preemption = in.readBoolean();
+  }
+
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sat Dec 28 21:58:33 2013
@@ -187,6 +187,7 @@ abstract public class Task implements Wr
   protected SecretKey tokenSecret;
   protected SecretKey shuffleSecret;
   protected GcTimeUpdater gcUpdater;
+  final AtomicBoolean mustPreempt = new AtomicBoolean(false);
 
   ////////////////////////////////////////////
   // Constructors
@@ -711,6 +712,7 @@ abstract public class Task implements Wr
         }
         try {
           boolean taskFound = true; // whether TT knows about this task
+          AMFeedback amFeedback = null;
           // sleep for a bit
           synchronized(lock) {
             if (taskDone.get()) {
@@ -728,12 +730,14 @@ abstract public class Task implements Wr
             taskStatus.statusUpdate(taskProgress.get(),
                                     taskProgress.toString(), 
                                     counters);
-            taskFound = umbilical.statusUpdate(taskId, taskStatus);
+            amFeedback = umbilical.statusUpdate(taskId, taskStatus);
+            taskFound = amFeedback.getTaskFound();
             taskStatus.clearStatus();
           }
           else {
             // send ping 
-            taskFound = umbilical.ping(taskId);
+            amFeedback = umbilical.statusUpdate(taskId, null);
+            taskFound = amFeedback.getTaskFound();
           }
 
           // if Task Tracker is not aware of our task ID (probably because it died and 
@@ -744,6 +748,17 @@ abstract public class Task implements Wr
             System.exit(66);
           }
 
+          // Set a flag that says we should preempt this is read by
+          // ReduceTasks in places of the execution where it is
+          // safe/easy to preempt
+          boolean lastPreempt = mustPreempt.get();
+          mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
+
+          if (lastPreempt ^ mustPreempt.get()) {
+            LOG.info("PREEMPTION TASK: setting mustPreempt to " +
+                mustPreempt.get() + " given " + amFeedback.getPreemption() +
+                " for "+ taskId + " task status: " +taskStatus.getPhase());
+          }
           sendProgress = resetProgressFlag(); 
           remainingRetries = MAX_RETRIES;
         } 
@@ -992,10 +1007,17 @@ abstract public class Task implements Wr
   public void done(TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {
-    LOG.info("Task:" + taskId + " is done."
-             + " And is in the process of committing");
     updateCounters();
-
+    if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
+      // If we are preempted, do no output promotion; signal done and exit
+      committer.commitTask(taskContext);
+      umbilical.preempted(taskId, taskStatus);
+      taskDone.set(true);
+      reporter.stopCommunicationThread();
+      return;
+    }
+    LOG.info("Task:" + taskId + " is done."
+        + " And is in the process of committing");
     boolean commitRequired = isCommitRequired();
     if (commitRequired) {
       int retries = MAX_RETRIES;
@@ -1054,7 +1076,7 @@ abstract public class Task implements Wr
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
           LOG.warn("Parent died.  Exiting "+taskId);
           System.exit(66);
         }
@@ -1098,8 +1120,8 @@ abstract public class Task implements Wr
     if (isMapTask() && conf.getNumReduceTasks() > 0) {
       try {
         Path mapOutput =  mapOutputFile.getOutputFile();
-        FileSystem localFS = FileSystem.getLocal(conf);
-        return localFS.getFileStatus(mapOutput).getLen();
+        FileSystem fs = mapOutput.getFileSystem(conf);
+        return fs.getFileStatus(mapOutput).getLen();
       } catch (IOException e) {
         LOG.warn ("Could not find output size " , e);
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Sat Dec 28 21:58:33 2013
@@ -51,7 +51,7 @@ public abstract class TaskStatus impleme
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
     
   private final TaskAttemptID taskid;
   private float progress;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Dec 28 21:58:33 2013
@@ -24,6 +24,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
 import org.apache.hadoop.security.token.TokenInfo;
 
@@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol e
    * Version 17 Modified TaskID to be aware of the new TaskTypes
    * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 19 Added fatalError for child to communicate fatal errors to TT
+   * Version 20 Added methods to manage checkpoints
    * */
 
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol e
   JvmTask getTask(JvmContext context) throws IOException;
   
   /**
-   * Report child's progress to parent.
+   * Report child's progress to parent. Also invoked to report still alive (used
+   * to be in ping). It reports an AMFeedback used to propagate preemption requests.
    * 
    * @param taskId task-id of the child
    * @param taskStatus status of the child
@@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol e
    * @throws InterruptedException
    * @return True if the task is known
    */
-  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
+  AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
   throws IOException, InterruptedException;
   
   /** Report error messages back to parent.  Calls should be sparing, since all
@@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol e
   void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) 
     throws IOException;
 
-  /** Periodically called by child to check if parent is still alive. 
-   * @return True if the task is known
-   */
-  boolean ping(TaskAttemptID taskid) throws IOException;
-
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
@@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol e
                                                        TaskAttemptID id) 
   throws IOException;
 
+  /**
+   * Report to the AM that the task has been succesfully preempted.
+   *
+   * @param taskId task's id
+   * @param taskStatus status of the child
+   * @throws IOException
+   */
+  void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException;
+
+  /**
+   * Return the latest CheckpointID for the given TaskID. This provides
+   * the task with a way to locate the checkpointed data and restart from
+   * that point in the computation.
+   *
+   * @param taskID task's id
+   * @return the most recent checkpoint (if any) for this task
+   * @throws IOException
+   */
+  TaskCheckpointID getCheckpointID(TaskID taskID);
+
+  /**
+   * Send a CheckpointID for a given TaskID to be stored in the AM,
+   * to later restart a task from this checkpoint.
+   * @param tid
+   * @param cid
+   */
+  void setCheckpointID(TaskID tid, TaskCheckpointID cid);
+
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Sat Dec 28 21:58:33 2013
@@ -34,37 +34,31 @@ import org.apache.hadoop.mapred.Counters
  * cost of checkpoints and other counters. This is sent by the task to the AM
  * to be stored and provided to the next execution of the same task.
  */
-public class TaskCheckpointID implements CheckpointID{
+public class TaskCheckpointID implements CheckpointID {
 
-  FSCheckpointID rawId;
-  private List<Path> partialOutput;
-  private Counters counters;
+  final FSCheckpointID rawId;
+  private final List<Path> partialOutput;
+  private final Counters counters;
 
   public TaskCheckpointID() {
-    this.rawId = new FSCheckpointID();
-    this.partialOutput = new ArrayList<Path>();
+    this(new FSCheckpointID(), new ArrayList<Path>(), new Counters());
   }
 
   public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
           Counters counters) {
     this.rawId = rawId;
     this.counters = counters;
-    if(partialOutput == null)
-      this.partialOutput = new ArrayList<Path>();
-    else
-      this.partialOutput = partialOutput;
+    this.partialOutput = null == partialOutput
+      ? new ArrayList<Path>()
+      : partialOutput;
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
     counters.write(out);
-    if (partialOutput == null) {
-      WritableUtils.writeVLong(out, 0L);
-    } else {
-      WritableUtils.writeVLong(out, partialOutput.size());
-      for(Path p:partialOutput){
-        Text.writeString(out, p.toString());
-      }
+    WritableUtils.writeVLong(out, partialOutput.size());
+    for (Path p : partialOutput) {
+      Text.writeString(out, p.toString());
     }
     rawId.write(out);
   }
@@ -74,21 +68,22 @@ public class TaskCheckpointID implements
     partialOutput.clear();
     counters.readFields(in);
     long numPout = WritableUtils.readVLong(in);
-    for(int i=0;i<numPout;i++)
+    for (int i = 0; i < numPout; i++) {
       partialOutput.add(new Path(Text.readString(in)));
+    }
     rawId.readFields(in);
   }
 
   @Override
-  public boolean equals(Object other){
+  public boolean equals(Object other) {
     if (other instanceof TaskCheckpointID){
-      return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
-             this.counters.equals(((TaskCheckpointID) other).counters) &&
-             this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
-             ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
-    } else {
-      return false;
+      TaskCheckpointID o = (TaskCheckpointID) other;
+      return rawId.equals(o.rawId) &&
+             counters.equals(o.counters) &&
+             partialOutput.containsAll(o.partialOutput) &&
+             o.partialOutput.containsAll(partialOutput);
     }
+    return false;
   }
 
   @Override
@@ -110,7 +105,7 @@ public class TaskCheckpointID implements
     return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
   }
 
-  public String toString(){
+  public String toString() {
     return rawId.toString() + " counters:" + counters;
 
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Sat Dec 28 21:58:33 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
 import junit.framework.TestCase;
@@ -29,20 +28,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.split.JobSplitWriter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -110,11 +106,16 @@ public class TestMapProgress extends Tes
       statusUpdate(taskId, taskStatus);
     }
     
+    public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      statusUpdate(taskId, taskStatus);
+    }
+
     public boolean canCommit(TaskAttemptID taskid) throws IOException {
       return true;
     }
     
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
+    public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskId);
@@ -128,7 +129,9 @@ public class TestMapProgress extends Tes
       LOG.info(buf.toString());
       // ignore phase
       // ignore counters
-      return true;
+      AMFeedback a = new AMFeedback();
+      a.setTaskFound(true);
+      return a;
     }
 
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
@@ -145,6 +148,17 @@ public class TestMapProgress extends Tes
         SortedRanges.Range range) throws IOException {
       LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
     }
+
+    @Override
+    public TaskCheckpointID getCheckpointID(TaskID taskId) {
+      // do nothing
+      return null;
+    }
+
+    @Override
+    public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+      // do nothing
+    }
   }
   
   private FileSystem fs = null;