You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2013/12/28 22:58:33 UTC
svn commit: r1553939 [1/2] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/h...
Author: cdouglas
Date: Sat Dec 28 21:58:33 2013
New Revision: 1553939
URL: http://svn.apache.org/r1553939
Log:
MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
Contributed by Carlo Curino
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java (with props)
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java (with props)
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java (with props)
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Dec 28 21:58:33 2013
@@ -77,6 +77,9 @@ Trunk (Unreleased)
MAPREDUCE-5189. Add policies and wiring to respond to preemption requests
from YARN. (Carlo Curino via cdouglas)
+ MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state.
+ (Carlo Curino via cdouglas)
+
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Sat Dec 28 21:58:33 2013
@@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -45,8 +47,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
@@ -229,6 +231,22 @@ public class TaskAttemptListenerImpl ext
}
@Override
+ public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ LOG.info("Preempted state update from " + taskAttemptID.toString());
+ // An attempt is telling us that it got preempted.
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+ TypeConverter.toYarn(taskAttemptID);
+
+ preemptionPolicy.reportSuccessfulPreemption(attemptID);
+ taskHeartbeatHandler.progressing(attemptID);
+
+ context.getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_PREEMPTED));
+ }
+
+ @Override
public void done(TaskAttemptID taskAttemptID) throws IOException {
LOG.info("Done acknowledgement from " + taskAttemptID.toString());
@@ -250,6 +268,10 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ // handling checkpoints
+ preemptionPolicy.handleFailedContainer(attemptID);
+
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@@ -264,6 +286,10 @@ public class TaskAttemptListenerImpl ext
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ // handling checkpoints
+ preemptionPolicy.handleFailedContainer(attemptID);
+
context.getEventHandler().handle(
new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
}
@@ -294,12 +320,6 @@ public class TaskAttemptListenerImpl ext
}
@Override
- public boolean ping(TaskAttemptID taskAttemptID) throws IOException {
- LOG.info("Ping from " + taskAttemptID.toString());
- return true;
- }
-
- @Override
public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
throws IOException {
diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
@@ -321,11 +341,33 @@ public class TaskAttemptListenerImpl ext
}
@Override
- public boolean statusUpdate(TaskAttemptID taskAttemptID,
+ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
TaskStatus taskStatus) throws IOException, InterruptedException {
- LOG.info("Status update from " + taskAttemptID.toString());
+
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID);
+
+ AMFeedback feedback = new AMFeedback();
+ feedback.setTaskFound(true);
+
+ // Propagating preemption to the task if TASK_PREEMPTION is enabled
+ if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false)
+ && preemptionPolicy.isPreempted(yarnAttemptID)) {
+ feedback.setPreemption(true);
+ LOG.info("Setting preemption bit for task: "+ yarnAttemptID
+ + " of type " + yarnAttemptID.getTaskId().getTaskType());
+ }
+
+ if (taskStatus == null) {
+ //We are using statusUpdate only as a simple ping
+ LOG.info("Ping from " + taskAttemptID.toString());
+ taskHeartbeatHandler.progressing(yarnAttemptID);
+ return feedback;
+ }
+
+ // if we are here there is an actual status update to be processed
+ LOG.info("Status update from " + taskAttemptID.toString());
+
taskHeartbeatHandler.progressing(yarnAttemptID);
TaskAttemptStatus taskAttemptStatus =
new TaskAttemptStatus();
@@ -386,7 +428,7 @@ public class TaskAttemptListenerImpl ext
context.getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,
taskAttemptStatus));
- return true;
+ return feedback;
}
@Override
@@ -494,4 +536,18 @@ public class TaskAttemptListenerImpl ext
return ProtocolSignature.getProtocolSignature(this,
protocol, clientVersion, clientMethodsHash);
}
+
+ // task checkpoint bookeeping
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ TaskId tid = TypeConverter.toYarn(taskId);
+ return preemptionPolicy.getCheckpointID(tid);
+ }
+
+ @Override
+ public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ TaskId tid = TypeConverter.toYarn(taskId);
+ preemptionPolicy.setCheckpointID(tid, cid);
+ }
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java Sat Dec 28 21:58:33 2013
@@ -47,6 +47,7 @@ public enum TaskAttemptEventType {
TA_FAILMSG,
TA_UPDATE,
TA_TIMED_OUT,
+ TA_PREEMPTED,
//Producer:TaskCleaner
TA_CLEANUP_DONE,
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Sat Dec 28 21:58:33 2013
@@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl im
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.KILLED,
+ TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
@@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl im
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ TaskAttemptEventType.TA_PREEMPTED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED))
@@ -1874,6 +1878,27 @@ public abstract class TaskAttemptImpl im
}
}
+ private static class PreemptedTransition implements
+ SingleArcTransition<TaskAttemptImpl,TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.setFinishTime();
+ taskAttempt.taskAttemptListener.unregister(
+ taskAttempt.attemptId, taskAttempt.jvmID);
+ taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+ taskAttempt.attemptId,
+ taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(),
+ taskAttempt.container.getContainerToken(),
+ ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+
+ }
+ }
+
private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Dec 28 21:58:33 2013
@@ -347,7 +347,7 @@ public class RMContainerAllocator extend
}
} else if (
- event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
+ event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
LOG.info("Processing the event " + event.toString());
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -19,10 +19,9 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.Container;
@@ -81,7 +80,7 @@ public interface AMPreemptionPolicy {
* successfully preempted (for bookeeping, counters, etc..)
* @param attemptID Task attempt that preempted
*/
- public void reportSuccessfulPreemption(TaskAttemptID attemptID);
+ public void reportSuccessfulPreemption(TaskAttemptId attemptID);
/**
* Callback informing the policy of containers exiting with a failure. This
@@ -98,20 +97,20 @@ public interface AMPreemptionPolicy {
public void handleCompletedContainer(TaskAttemptId attemptID);
/**
- * Method to retrieve the latest checkpoint for a given {@link TaskID}
+ * Method to retrieve the latest checkpoint for a given {@link TaskId}
* @param taskId TaskID
* @return CheckpointID associated with this task or null
*/
- public TaskCheckpointID getCheckpointID(TaskID taskId);
+ public TaskCheckpointID getCheckpointID(TaskId taskId);
/**
* Method to store the latest {@link
* org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link
- * TaskID}. Assigning a null is akin to remove all previous checkpoints for
+ * TaskId}. Assigning a null is akin to remove all previous checkpoints for
* this task.
* @param taskId TaskID
* @param cid Checkpoint to assign or <tt>null</tt> to remove it.
*/
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid);
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid);
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+/**
+ * This policy works in combination with an implementation of task
+ * checkpointing. It computes the tasks to be preempted in response to the RM
+ * request for preemption. For strict requests, it maps containers to
+ * corresponding tasks; for fungible requests, it attempts to pick the best
+ * containers to preempt (reducers in reverse allocation order). The
+ * TaskAttemptListener will interrogate this policy when handling a task
+ * heartbeat to check whether the task should be preempted or not. When handling
+ * fungible requests, the policy discount the RM ask by the amount of currently
+ * in-flight preemptions (i.e., tasks that are checkpointing).
+ *
+ * This class it is also used to maintain the list of checkpoints for existing
+ * tasks. Centralizing this functionality here, allows us to have visibility on
+ * preemption and checkpoints in a single location, thus coordinating preemption
+ * and checkpoint management decisions in a single policy.
+ */
+public class CheckpointAMPreemptionPolicy implements AMPreemptionPolicy {
+
+ // task attempts flagged for preemption
+ private final Set<TaskAttemptId> toBePreempted;
+
+ private final Set<TaskAttemptId> countedPreemptions;
+
+ private final Map<TaskId,TaskCheckpointID> checkpoints;
+
+ private final Map<TaskAttemptId,Resource> pendingFlexiblePreemptions;
+
+ @SuppressWarnings("rawtypes")
+ private EventHandler eventHandler;
+
+ static final Log LOG = LogFactory
+ .getLog(CheckpointAMPreemptionPolicy.class);
+
+ public CheckpointAMPreemptionPolicy() {
+ this(Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
+ Collections.synchronizedSet(new HashSet<TaskAttemptId>()),
+ Collections.synchronizedMap(new HashMap<TaskId,TaskCheckpointID>()),
+ Collections.synchronizedMap(new HashMap<TaskAttemptId,Resource>()));
+ }
+
+ CheckpointAMPreemptionPolicy(Set<TaskAttemptId> toBePreempted,
+ Set<TaskAttemptId> countedPreemptions,
+ Map<TaskId,TaskCheckpointID> checkpoints,
+ Map<TaskAttemptId,Resource> pendingFlexiblePreemptions) {
+ this.toBePreempted = toBePreempted;
+ this.countedPreemptions = countedPreemptions;
+ this.checkpoints = checkpoints;
+ this.pendingFlexiblePreemptions = pendingFlexiblePreemptions;
+ }
+
+ @Override
+ public void init(AppContext context) {
+ this.eventHandler = context.getEventHandler();
+ }
+
+ @Override
+ public void preempt(Context ctxt, PreemptionMessage preemptionRequests) {
+
+ if (preemptionRequests != null) {
+
+ // handling non-negotiable preemption
+
+ StrictPreemptionContract cStrict = preemptionRequests.getStrictContract();
+ if (cStrict != null
+ && cStrict.getContainers() != null
+ && cStrict.getContainers().size() > 0) {
+ LOG.info("strict preemption :" +
+ preemptionRequests.getStrictContract().getContainers().size() +
+ " containers to kill");
+
+ // handle strict preemptions. These containers are non-negotiable
+ for (PreemptionContainer c :
+ preemptionRequests.getStrictContract().getContainers()) {
+ ContainerId reqCont = c.getId();
+ TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont);
+ if (reqTask != null) {
+ // ignore requests for preempting containers running maps
+ if (org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE
+ .equals(reqTask.getTaskId().getTaskType())) {
+ toBePreempted.add(reqTask);
+ LOG.info("preempting " + reqCont + " running task:" + reqTask);
+ } else {
+ LOG.info("NOT preempting " + reqCont + " running task:" + reqTask);
+ }
+ }
+ }
+ }
+
+ // handling negotiable preemption
+ PreemptionContract cNegot = preemptionRequests.getContract();
+ if (cNegot != null
+ && cNegot.getResourceRequest() != null
+ && cNegot.getResourceRequest().size() > 0
+ && cNegot.getContainers() != null
+ && cNegot.getContainers().size() > 0) {
+
+ LOG.info("negotiable preemption :" +
+ preemptionRequests.getContract().getResourceRequest().size() +
+ " resourceReq, " +
+ preemptionRequests.getContract().getContainers().size() +
+ " containers");
+ // handle fungible preemption. Here we only look at the total amount of
+ // resources to be preempted and pick enough of our containers to
+ // satisfy that. We only support checkpointing for reducers for now.
+ List<PreemptionResourceRequest> reqResources =
+ preemptionRequests.getContract().getResourceRequest();
+
+ // compute the total amount of pending preemptions (to be discounted
+ // from current request)
+ int pendingPreemptionRam = 0;
+ int pendingPreemptionCores = 0;
+ for (Resource r : pendingFlexiblePreemptions.values()) {
+ pendingPreemptionRam += r.getMemory();
+ pendingPreemptionCores += r.getVirtualCores();
+ }
+
+ // discount preemption request based on currently pending preemption
+ for (PreemptionResourceRequest rr : reqResources) {
+ ResourceRequest reqRsrc = rr.getResourceRequest();
+ if (!ResourceRequest.ANY.equals(reqRsrc.getResourceName())) {
+ // For now, only respond to aggregate requests and ignore locality
+ continue;
+ }
+
+ LOG.info("ResourceRequest:" + reqRsrc);
+ int reqCont = reqRsrc.getNumContainers();
+ int reqMem = reqRsrc.getCapability().getMemory();
+ int totalMemoryToRelease = reqCont * reqMem;
+ int reqCores = reqRsrc.getCapability().getVirtualCores();
+ int totalCoresToRelease = reqCont * reqCores;
+
+ // remove
+ if (pendingPreemptionRam > 0) {
+ // if goes negative we simply exit
+ totalMemoryToRelease -= pendingPreemptionRam;
+ // decrement pending resources if zero or negatve we will
+ // ignore it while processing next PreemptionResourceRequest
+ pendingPreemptionRam -= totalMemoryToRelease;
+ }
+ if (pendingPreemptionCores > 0) {
+ totalCoresToRelease -= pendingPreemptionCores;
+ pendingPreemptionCores -= totalCoresToRelease;
+ }
+
+ // reverse order of allocation (for now)
+ List<Container> listOfCont = ctxt.getContainers(TaskType.REDUCE);
+ Collections.sort(listOfCont, new Comparator<Container>() {
+ @Override
+ public int compare(final Container o1, final Container o2) {
+ return o2.getId().getId() - o1.getId().getId();
+ }
+ });
+
+ // preempt reducers first
+ for (Container cont : listOfCont) {
+ if (totalMemoryToRelease <= 0 && totalCoresToRelease<=0) {
+ break;
+ }
+ TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId());
+ int cMem = cont.getResource().getMemory();
+ int cCores = cont.getResource().getVirtualCores();
+
+ if (!toBePreempted.contains(reduceId)) {
+ totalMemoryToRelease -= cMem;
+ totalCoresToRelease -= cCores;
+ toBePreempted.add(reduceId);
+ pendingFlexiblePreemptions.put(reduceId, cont.getResource());
+ }
+ LOG.info("ResourceRequest:" + reqRsrc + " satisfied preempting "
+ + reduceId);
+ }
+ // if map was preemptable we would do add them to toBePreempted here
+ }
+ }
+ }
+ }
+
+ @Override
+ public void handleFailedContainer(TaskAttemptId attemptID) {
+ toBePreempted.remove(attemptID);
+ checkpoints.remove(attemptID.getTaskId());
+ }
+
+ @Override
+ public void handleCompletedContainer(TaskAttemptId attemptID){
+ LOG.info(" task completed:" + attemptID);
+ toBePreempted.remove(attemptID);
+ pendingFlexiblePreemptions.remove(attemptID);
+ }
+
+ @Override
+ public boolean isPreempted(TaskAttemptId yarnAttemptID) {
+ if (toBePreempted.contains(yarnAttemptID)) {
+ updatePreemptionCounters(yarnAttemptID);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
+ // ignore
+ }
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskId taskId) {
+ return checkpoints.get(taskId);
+ }
+
+ @Override
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
+ checkpoints.put(taskId, cid);
+ if (cid != null) {
+ updateCheckpointCounters(taskId, cid);
+ }
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID cid) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId());
+ jce.addCounterUpdate(JobCounter.CHECKPOINTS, 1);
+ eventHandler.handle(jce);
+ jce = new JobCounterUpdateEvent(taskId.getJobId());
+ jce.addCounterUpdate(JobCounter.CHECKPOINT_BYTES, cid.getCheckpointBytes());
+ eventHandler.handle(jce);
+ jce = new JobCounterUpdateEvent(taskId.getJobId());
+ jce.addCounterUpdate(JobCounter.CHECKPOINT_TIME, cid.getCheckpointTime());
+ eventHandler.handle(jce);
+
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private void updatePreemptionCounters(TaskAttemptId yarnAttemptID) {
+ if (!countedPreemptions.contains(yarnAttemptID)) {
+ countedPreemptions.add(yarnAttemptID);
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(yarnAttemptID
+ .getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1);
+ eventHandler.handle(jce);
+ }
+ }
+
+}
Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -19,11 +19,10 @@ package org.apache.hadoop.mapreduce.v2.a
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -89,17 +88,17 @@ public class KillAMPreemptionPolicy impl
}
@Override
- public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+ public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
- public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.mapreduce.v2.app.rm.preemption;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -50,17 +49,17 @@ public class NoopAMPreemptionPolicy impl
}
@Override
- public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) {
+ public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) {
// ignore
}
@Override
- public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ public TaskCheckpointID getCheckpointID(TaskId taskId) {
return null;
}
@Override
- public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) {
+ public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) {
// ignore
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Sat Dec 28 21:58:33 2013
@@ -17,26 +17,23 @@
*/
package org.apache.hadoop.mapred;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.checkpoint.EnumCounter;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-
-import junit.framework.Assert;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.SystemClock;
+
import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestTaskAttemptListenerImpl {
- public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl {
+ public static class MockTaskAttemptListenerImpl
+ extends TaskAttemptListenerImpl {
public MockTaskAttemptListenerImpl(AppContext context,
JobTokenSecretManager jobTokenSecretManager,
RMHeartbeatHandler rmHeartbeatHandler,
- TaskHeartbeatHandler hbHandler) {
- super(context, jobTokenSecretManager, rmHeartbeatHandler, null);
+ TaskHeartbeatHandler hbHandler,
+ AMPreemptionPolicy policy) {
+
+ super(context, jobTokenSecretManager, rmHeartbeatHandler, policy);
this.taskHeartbeatHandler = hbHandler;
}
@@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
MockTaskAttemptListenerImpl listener =
new MockTaskAttemptListenerImpl(appCtx, secret,
- rmHeartbeatHandler, hbHandler);
+ rmHeartbeatHandler, hbHandler, policy);
Configuration conf = new Configuration();
listener.init(conf);
listener.start();
@@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl
assertNotNull(jvmid);
try {
JVMId.forName("jvm_001_002_m_004_006");
- Assert.fail();
+ fail();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
"TaskId string : jvm_001_002_m_004_006 is not properly formed");
@@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
- RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
@@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl
RMHeartbeatHandler rmHeartbeatHandler =
mock(RMHeartbeatHandler.class);
final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
- TaskAttemptListenerImpl listener =
- new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) {
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
@Override
protected void registerHeartbeatHandler(Configuration conf) {
taskHeartbeatHandler = hbHandler;
@@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl
listener.stop();
}
+
+ @Test
+ public void testCheckpointIDTracking()
+ throws IOException, InterruptedException{
+
+ SystemClock clock = new SystemClock();
+
+ org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
+ mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
+ when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask);
+
+ Dispatcher dispatcher = mock(Dispatcher.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(dispatcher.getEventHandler()).thenReturn(ea);
+
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+
+ AppContext appCtx = mock(AppContext.class);
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+ when(appCtx.getClock()).thenReturn(clock);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ when(appCtx.getEventHandler()).thenReturn(ea);
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl(
+ appCtx, secret, rmHeartbeatHandler, policy) {
+ @Override
+ protected void registerHeartbeatHandler(Configuration conf) {
+ taskHeartbeatHandler = hbHandler;
+ }
+ };
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true);
+ //conf.setBoolean("preemption.reduce", true);
+
+ listener.init(conf);
+ listener.start();
+
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+
+ List<Path> partialOut = new ArrayList<Path>();
+ partialOut.add(new Path("/prev1"));
+ partialOut.add(new Path("/prev2"));
+
+ Counters counters = mock(Counters.class);
+ final long CBYTES = 64L * 1024 * 1024;
+ final long CTIME = 4344L;
+ final Path CLOC = new Path("/test/1");
+ Counter cbytes = mock(Counter.class);
+ when(cbytes.getValue()).thenReturn(CBYTES);
+ Counter ctime = mock(Counter.class);
+ when(ctime.getValue()).thenReturn(CTIME);
+ when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES)))
+ .thenReturn(cbytes);
+ when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS)))
+ .thenReturn(ctime);
+
+ // propagating a taskstatus that contains a checkpoint id
+ TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID(
+ CLOC), partialOut, counters);
+ listener.setCheckpointID(
+ org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid);
+
+ // and try to get it back
+ CheckpointID outcid = listener.getCheckpointID(tid.getTaskID());
+ TaskCheckpointID tcid = (TaskCheckpointID) outcid;
+ assertEquals(CBYTES, tcid.getCheckpointBytes());
+ assertEquals(CTIME, tcid.getCheckpointTime());
+ assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput()));
+ assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut));
+
+ //assert it worked
+ assert outcid == incid;
+
+ listener.stop();
+
+ }
+
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.hadoop.yarn.api.records.PreemptionContract;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCheckpointPreemptionPolicy {
+
+ TaskAttemptListenerImpl pel= null;
+ RMContainerAllocator r;
+ JobId jid;
+ RunningAppContext mActxt;
+ Set<ContainerId> preemptedContainers = new HashSet<ContainerId>();
+ Map<ContainerId,TaskAttemptId> assignedContainers =
+ new HashMap<ContainerId, TaskAttemptId>();
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ HashMap<ContainerId,Resource> contToResourceMap =
+ new HashMap<ContainerId, Resource>();
+
+ private int minAlloc = 1024;
+
+ @Before
+ @SuppressWarnings("rawtypes") // mocked generics
+ public void setup() {
+ ApplicationId appId = ApplicationId.newInstance(200, 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ jid = MRBuilderUtils.newJobId(appId, 1);
+
+ mActxt = mock(RunningAppContext.class);
+ EventHandler ea = mock(EventHandler.class);
+ when(mActxt.getEventHandler()).thenReturn(ea);
+ for (int i = 0; i < 40; ++i) {
+ ContainerId cId = ContainerId.newInstance(appAttemptId, i);
+ if (0 == i % 7) {
+ preemptedContainers.add(cId);
+ }
+ TaskId tId = 0 == i % 2
+ ? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP)
+ : MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE);
+ assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0));
+ contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2));
+ }
+
+ for (Map.Entry<ContainerId,TaskAttemptId> ent :
+ assignedContainers.entrySet()) {
+ System.out.println("cont:" + ent.getKey().getId() +
+ " type:" + ent.getValue().getTaskId().getTaskType() +
+ " res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" );
+ }
+ }
+
+ @Test
+ public void testStrictPreemptionContract() {
+
+ final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
+ AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
+ @Override
+ public TaskAttemptId getTaskAttempt(ContainerId cId) {
+ return containers.get(cId);
+ }
+ @Override
+ public List<Container> getContainers(TaskType t) {
+ List<Container> p = new ArrayList<Container>();
+ for (Map.Entry<ContainerId,TaskAttemptId> ent :
+ assignedContainers.entrySet()) {
+ if (ent.getValue().getTaskId().getTaskType().equals(t)) {
+ p.add(Container.newInstance(ent.getKey(), null, null,
+ contToResourceMap.get(ent.getKey()),
+ Priority.newInstance(0), null));
+ }
+ }
+ return p;
+ }
+ };
+
+ PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
+ contToResourceMap, Resource.newInstance(1024, 1), true);
+
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(mActxt);
+ policy.preempt(mPctxt, pM);
+
+
+ for (ContainerId c : preemptedContainers) {
+ TaskAttemptId t = assignedContainers.get(c);
+ if (TaskType.MAP.equals(t.getTaskId().getTaskType())) {
+ assert policy.isPreempted(t) == false;
+ } else {
+ assert policy.isPreempted(t);
+ }
+ }
+ }
+
+
+ @Test
+ public void testPreemptionContract() {
+ final Map<ContainerId,TaskAttemptId> containers = assignedContainers;
+ AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() {
+ @Override
+ public TaskAttemptId getTaskAttempt(ContainerId cId) {
+ return containers.get(cId);
+ }
+
+ @Override
+ public List<Container> getContainers(TaskType t) {
+ List<Container> p = new ArrayList<Container>();
+ for (Map.Entry<ContainerId,TaskAttemptId> ent :
+ assignedContainers.entrySet()){
+ if(ent.getValue().getTaskId().getTaskType().equals(t)){
+ p.add(Container.newInstance(ent.getKey(), null, null,
+ contToResourceMap.get(ent.getKey()),
+ Priority.newInstance(0), null));
+ }
+ }
+ return p;
+ }
+ };
+
+ PreemptionMessage pM = generatePreemptionMessage(preemptedContainers,
+ contToResourceMap, Resource.newInstance(minAlloc, 1), false);
+
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(mActxt);
+
+ int supposedMemPreemption = pM.getContract().getResourceRequest()
+ .get(0).getResourceRequest().getCapability().getMemory()
+ * pM.getContract().getResourceRequest().get(0).getResourceRequest()
+ .getNumContainers();
+
+ // first round of preemption
+ policy.preempt(mPctxt, pM);
+ List<TaskAttemptId> preempting =
+ validatePreemption(pM, policy, supposedMemPreemption);
+
+ // redundant message
+ policy.preempt(mPctxt, pM);
+ List<TaskAttemptId> preempting2 =
+ validatePreemption(pM, policy, supposedMemPreemption);
+
+ // check that nothing got added
+ assert preempting2.equals(preempting);
+
+ // simulate 2 task completions/successful preemption
+ policy.handleCompletedContainer(preempting.get(0));
+ policy.handleCompletedContainer(preempting.get(1));
+
+ // remove from assignedContainers
+ Iterator<Map.Entry<ContainerId,TaskAttemptId>> it =
+ assignedContainers.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<ContainerId,TaskAttemptId> ent = it.next();
+ if (ent.getValue().equals(preempting.get(0)) ||
+ ent.getValue().equals(preempting.get(1)))
+ it.remove();
+ }
+
+ // one more message asking for preemption
+ policy.preempt(mPctxt, pM);
+
+ // triggers preemption of 2 more containers (i.e., the preemption set changes)
+ List<TaskAttemptId> preempting3 =
+ validatePreemption(pM, policy, supposedMemPreemption);
+ assert preempting3.equals(preempting2) == false;
+ }
+
+ private List<TaskAttemptId> validatePreemption(PreemptionMessage pM,
+ CheckpointAMPreemptionPolicy policy, int supposedMemPreemption) {
+ Resource effectivelyPreempted = Resource.newInstance(0, 0);
+
+ List<TaskAttemptId> preempting = new ArrayList<TaskAttemptId>();
+
+ for (Map.Entry<ContainerId, TaskAttemptId> ent :
+ assignedContainers.entrySet()) {
+ if (policy.isPreempted(ent.getValue())) {
+ Resources.addTo(effectivelyPreempted,contToResourceMap.get(ent.getKey()));
+ // preempt only reducers
+ if (policy.isPreempted(ent.getValue())){
+ assertEquals(TaskType.REDUCE, ent.getValue().getTaskId().getTaskType());
+ preempting.add(ent.getValue());
+ }
+ }
+ }
+
+ // preempt enough
+ assert (effectivelyPreempted.getMemory() >= supposedMemPreemption)
+ : " preempted: " + effectivelyPreempted.getMemory();
+
+ // preempt not too much enough
+ assert effectivelyPreempted.getMemory() <= supposedMemPreemption + minAlloc;
+ return preempting;
+ }
+
+ private PreemptionMessage generatePreemptionMessage(
+ Set<ContainerId> containerToPreempt,
+ HashMap<ContainerId, Resource> resPerCont,
+ Resource minimumAllocation, boolean strict) {
+
+ Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+ new HashSet<ContainerId>(containerToPreempt));
+ containerToPreempt.clear();
+ Resource tot = Resource.newInstance(0, 0);
+ for(ContainerId c : currentContPreemption){
+ Resources.addTo(tot,
+ resPerCont.get(c));
+ }
+ int numCont = (int) Math.ceil(tot.getMemory() /
+ (double) minimumAllocation.getMemory());
+ ResourceRequest rr = ResourceRequest.newInstance(
+ Priority.newInstance(0), ResourceRequest.ANY,
+ minimumAllocation, numCont);
+ if (strict) {
+ return generatePreemptionMessage(new Allocation(null, null,
+ currentContPreemption, null, null));
+ }
+ return generatePreemptionMessage(new Allocation(null, null,
+ null, currentContPreemption,
+ Collections.singletonList(rr)));
+ }
+
+
+ private PreemptionMessage generatePreemptionMessage(Allocation allocation) {
+ PreemptionMessage pMsg = null;
+ // assemble strict preemption request
+ if (allocation.getStrictContainerPreemptions() != null) {
+ pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
+ StrictPreemptionContract pStrict =
+ recordFactory.newRecordInstance(StrictPreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ pStrict.setContainers(pCont);
+ pMsg.setStrictContract(pStrict);
+ }
+
+ // assemble negotiable preemption request
+ if (allocation.getResourcePreemptions() != null &&
+ allocation.getResourcePreemptions().size() > 0 &&
+ allocation.getContainerPreemptions() != null &&
+ allocation.getContainerPreemptions().size() > 0) {
+ if (pMsg == null) {
+ pMsg = recordFactory.newRecordInstance(PreemptionMessage.class);
+ }
+ PreemptionContract contract =
+ recordFactory.newRecordInstance(PreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ List<PreemptionResourceRequest> pRes =
+ new ArrayList<PreemptionResourceRequest>();
+ for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+ PreemptionResourceRequest prr =
+ recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+ prr.setResourceRequest(crr);
+ pRes.add(prr);
+ }
+ contract.setContainers(pCont);
+ contract.setResourceRequest(pRes);
+ pMsg.setContract(contract);
+ }
+ return pMsg;
+ }
+
+}
Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Dec 28 21:58:33 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.Queue
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@@ -575,10 +576,17 @@ public class LocalJobRunner implements C
// TaskUmbilicalProtocol methods
+ @Override
public JvmTask getTask(JvmContext context) { return null; }
- public synchronized boolean statusUpdate(TaskAttemptID taskId,
+ @Override
+ public synchronized AMFeedback statusUpdate(TaskAttemptID taskId,
TaskStatus taskStatus) throws IOException, InterruptedException {
+ AMFeedback feedback = new AMFeedback();
+ feedback.setTaskFound(true);
+ if (null == taskStatus) {
+ return feedback;
+ }
// Serialize as we would if distributed in order to make deep copy
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -618,7 +626,7 @@ public class LocalJobRunner implements C
}
// ignore phase
- return true;
+ return feedback;
}
/** Return the current values of the counters for this job,
@@ -654,24 +662,24 @@ public class LocalJobRunner implements C
statusUpdate(taskid, taskStatus);
}
+ @Override
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) {
// Ignore for now
}
+ @Override
public void reportNextRecordRange(TaskAttemptID taskid,
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
- public boolean ping(TaskAttemptID taskid) throws IOException {
- return true;
- }
-
+ @Override
public boolean canCommit(TaskAttemptID taskid)
throws IOException {
return true;
}
+ @Override
public void done(TaskAttemptID taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
@@ -681,11 +689,13 @@ public class LocalJobRunner implements C
}
}
+ @Override
public synchronized void fsError(TaskAttemptID taskId, String message)
throws IOException {
LOG.fatal("FSError: "+ message + "from task: " + taskId);
}
+ @Override
public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
}
@@ -695,12 +705,30 @@ public class LocalJobRunner implements C
LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
}
+ @Override
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
-
+
+ @Override
+ public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ // ignore
+ }
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ // ignore
+ return null;
+ }
+
+ @Override
+ public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+ // ignore
+ }
+
}
public LocalJobRunner(Configuration conf) throws IOException {
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java?rev=1553939&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java Sat Dec 28 21:58:33 2013
@@ -0,0 +1,63 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class is a simple struct to include both the taskFound information and
+ * a possible preemption request coming from the AM.
+ */
+public class AMFeedback implements Writable {
+
+ boolean taskFound;
+ boolean preemption;
+
+ public void setTaskFound(boolean t){
+ taskFound=t;
+ }
+
+ public boolean getTaskFound(){
+ return taskFound;
+ }
+
+ public void setPreemption(boolean preemption) {
+ this.preemption=preemption;
+ }
+
+ public boolean getPreemption() {
+ return preemption;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(taskFound);
+ out.writeBoolean(preemption);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskFound = in.readBoolean();
+ preemption = in.readBoolean();
+ }
+
+}
Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sat Dec 28 21:58:33 2013
@@ -187,6 +187,7 @@ abstract public class Task implements Wr
protected SecretKey tokenSecret;
protected SecretKey shuffleSecret;
protected GcTimeUpdater gcUpdater;
+ final AtomicBoolean mustPreempt = new AtomicBoolean(false);
////////////////////////////////////////////
// Constructors
@@ -711,6 +712,7 @@ abstract public class Task implements Wr
}
try {
boolean taskFound = true; // whether TT knows about this task
+ AMFeedback amFeedback = null;
// sleep for a bit
synchronized(lock) {
if (taskDone.get()) {
@@ -728,12 +730,14 @@ abstract public class Task implements Wr
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
- taskFound = umbilical.statusUpdate(taskId, taskStatus);
+ amFeedback = umbilical.statusUpdate(taskId, taskStatus);
+ taskFound = amFeedback.getTaskFound();
taskStatus.clearStatus();
}
else {
// send ping
- taskFound = umbilical.ping(taskId);
+ amFeedback = umbilical.statusUpdate(taskId, null);
+ taskFound = amFeedback.getTaskFound();
}
// if Task Tracker is not aware of our task ID (probably because it died and
@@ -744,6 +748,17 @@ abstract public class Task implements Wr
System.exit(66);
}
+ // Set a flag that says we should preempt this is read by
+ // ReduceTasks in places of the execution where it is
+ // safe/easy to preempt
+ boolean lastPreempt = mustPreempt.get();
+ mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption());
+
+ if (lastPreempt ^ mustPreempt.get()) {
+ LOG.info("PREEMPTION TASK: setting mustPreempt to " +
+ mustPreempt.get() + " given " + amFeedback.getPreemption() +
+ " for "+ taskId + " task status: " +taskStatus.getPhase());
+ }
sendProgress = resetProgressFlag();
remainingRetries = MAX_RETRIES;
}
@@ -992,10 +1007,17 @@ abstract public class Task implements Wr
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
- LOG.info("Task:" + taskId + " is done."
- + " And is in the process of committing");
updateCounters();
-
+ if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) {
+ // If we are preempted, do no output promotion; signal done and exit
+ committer.commitTask(taskContext);
+ umbilical.preempted(taskId, taskStatus);
+ taskDone.set(true);
+ reporter.stopCommunicationThread();
+ return;
+ }
+ LOG.info("Task:" + taskId + " is done."
+ + " And is in the process of committing");
boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
@@ -1054,7 +1076,7 @@ abstract public class Task implements Wr
int retries = MAX_RETRIES;
while (true) {
try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) {
LOG.warn("Parent died. Exiting "+taskId);
System.exit(66);
}
@@ -1098,8 +1120,8 @@ abstract public class Task implements Wr
if (isMapTask() && conf.getNumReduceTasks() > 0) {
try {
Path mapOutput = mapOutputFile.getOutputFile();
- FileSystem localFS = FileSystem.getLocal(conf);
- return localFS.getFileStatus(mapOutput).getLen();
+ FileSystem fs = mapOutput.getFileSystem(conf);
+ return fs.getFileStatus(mapOutput).getLen();
} catch (IOException e) {
LOG.warn ("Could not find output size " , e);
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Sat Dec 28 21:58:33 2013
@@ -51,7 +51,7 @@ public abstract class TaskStatus impleme
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED}
private final TaskAttemptID taskid;
private float progress;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Dec 28 21:58:33 2013
@@ -24,6 +24,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.checkpoint.CheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
import org.apache.hadoop.security.token.TokenInfo;
@@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol e
* Version 17 Modified TaskID to be aware of the new TaskTypes
* Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
* Version 19 Added fatalError for child to communicate fatal errors to TT
+ * Version 20 Added methods to manage checkpoints
* */
- public static final long versionID = 19L;
+ public static final long versionID = 20L;
/**
* Called when a child task process starts, to get its task.
@@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol e
JvmTask getTask(JvmContext context) throws IOException;
/**
- * Report child's progress to parent.
+ * Report child's progress to parent. Also invoked to report still alive (used
+ * to be in ping). It reports an AMFeedback used to propagate preemption requests.
*
* @param taskId task-id of the child
* @param taskStatus status of the child
@@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol e
* @throws InterruptedException
* @return True if the task is known
*/
- boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
/** Report error messages back to parent. Calls should be sparing, since all
@@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol e
void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
throws IOException;
- /** Periodically called by child to check if parent is still alive.
- * @return True if the task is known
- */
- boolean ping(TaskAttemptID taskid) throws IOException;
-
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this.
* @param taskid task's id
@@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol e
TaskAttemptID id)
throws IOException;
+ /**
+ * Report to the AM that the task has been succesfully preempted.
+ *
+ * @param taskId task's id
+ * @param taskStatus status of the child
+ * @throws IOException
+ */
+ void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException;
+
+ /**
+ * Return the latest CheckpointID for the given TaskID. This provides
+ * the task with a way to locate the checkpointed data and restart from
+ * that point in the computation.
+ *
+ * @param taskID task's id
+ * @return the most recent checkpoint (if any) for this task
+ * @throws IOException
+ */
+ TaskCheckpointID getCheckpointID(TaskID taskID);
+
+ /**
+ * Send a CheckpointID for a given TaskID to be stored in the AM,
+ * to later restart a task from this checkpoint.
+ * @param tid
+ * @param cid
+ */
+ void setCheckpointID(TaskID tid, TaskCheckpointID cid);
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Sat Dec 28 21:58:33 2013
@@ -34,37 +34,31 @@ import org.apache.hadoop.mapred.Counters
* cost of checkpoints and other counters. This is sent by the task to the AM
* to be stored and provided to the next execution of the same task.
*/
-public class TaskCheckpointID implements CheckpointID{
+public class TaskCheckpointID implements CheckpointID {
- FSCheckpointID rawId;
- private List<Path> partialOutput;
- private Counters counters;
+ final FSCheckpointID rawId;
+ private final List<Path> partialOutput;
+ private final Counters counters;
public TaskCheckpointID() {
- this.rawId = new FSCheckpointID();
- this.partialOutput = new ArrayList<Path>();
+ this(new FSCheckpointID(), new ArrayList<Path>(), new Counters());
}
public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput,
Counters counters) {
this.rawId = rawId;
this.counters = counters;
- if(partialOutput == null)
- this.partialOutput = new ArrayList<Path>();
- else
- this.partialOutput = partialOutput;
+ this.partialOutput = null == partialOutput
+ ? new ArrayList<Path>()
+ : partialOutput;
}
@Override
public void write(DataOutput out) throws IOException {
counters.write(out);
- if (partialOutput == null) {
- WritableUtils.writeVLong(out, 0L);
- } else {
- WritableUtils.writeVLong(out, partialOutput.size());
- for(Path p:partialOutput){
- Text.writeString(out, p.toString());
- }
+ WritableUtils.writeVLong(out, partialOutput.size());
+ for (Path p : partialOutput) {
+ Text.writeString(out, p.toString());
}
rawId.write(out);
}
@@ -74,21 +68,22 @@ public class TaskCheckpointID implements
partialOutput.clear();
counters.readFields(in);
long numPout = WritableUtils.readVLong(in);
- for(int i=0;i<numPout;i++)
+ for (int i = 0; i < numPout; i++) {
partialOutput.add(new Path(Text.readString(in)));
+ }
rawId.readFields(in);
}
@Override
- public boolean equals(Object other){
+ public boolean equals(Object other) {
if (other instanceof TaskCheckpointID){
- return this.rawId.equals(((TaskCheckpointID)other).rawId) &&
- this.counters.equals(((TaskCheckpointID) other).counters) &&
- this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) &&
- ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput);
- } else {
- return false;
+ TaskCheckpointID o = (TaskCheckpointID) other;
+ return rawId.equals(o.rawId) &&
+ counters.equals(o.counters) &&
+ partialOutput.containsAll(o.partialOutput) &&
+ o.partialOutput.containsAll(partialOutput);
}
+ return false;
}
@Override
@@ -110,7 +105,7 @@ public class TaskCheckpointID implements
return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue();
}
- public String toString(){
+ public String toString() {
return rawId.toString() + " counters:" + counters;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1553939&r1=1553938&r2=1553939&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Sat Dec 28 21:58:33 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
@@ -29,20 +28,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.split.JobSplitWriter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -110,11 +106,16 @@ public class TestMapProgress extends Tes
statusUpdate(taskId, taskStatus);
}
+ public void preempted(TaskAttemptID taskId, TaskStatus taskStatus)
+ throws IOException, InterruptedException {
+ statusUpdate(taskId, taskStatus);
+ }
+
public boolean canCommit(TaskAttemptID taskid) throws IOException {
return true;
}
- public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+ public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
@@ -128,7 +129,9 @@ public class TestMapProgress extends Tes
LOG.info(buf.toString());
// ignore phase
// ignore counters
- return true;
+ AMFeedback a = new AMFeedback();
+ a.setTaskFound(true);
+ return a;
}
public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
@@ -145,6 +148,17 @@ public class TestMapProgress extends Tes
SortedRanges.Range range) throws IOException {
LOG.info("Task " + taskid + " reportedNextRecordRange " + range);
}
+
+ @Override
+ public TaskCheckpointID getCheckpointID(TaskID taskId) {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) {
+ // do nothing
+ }
}
private FileSystem fs = null;