You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/02/13 20:23:51 UTC
svn commit: r1445874 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: sseth
Date: Wed Feb 13 19:23:51 2013
New Revision: 1445874
URL: http://svn.apache.org/r1445874
Log:
MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection of the best attempt for a task. Contributed by Jason Lowe.
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1445874&r1=1445873&r2=1445874&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 19:23:51 2013
@@ -65,6 +65,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
speculative attempts (Robert Parker via jlowe)
+ MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
+ of the best attempt for a task. (Jason Lowe via sseth)
+
Release 0.23.6 - 2013-02-06
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1445874&r1=1445873&r2=1445874&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Wed Feb 13 19:23:51 2013
@@ -532,6 +532,10 @@ public abstract class TaskImpl implement
//select the nextAttemptNumber with best progress
// always called inside the Read Lock
private TaskAttempt selectBestAttempt() {
+ if (successfulAttempt != null) {
+ return attempts.get(successfulAttempt);
+ }
+
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1445874&r1=1445873&r2=1445874&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Wed Feb 13 19:23:51 2013
@@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -138,6 +141,7 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
+ private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -172,7 +176,15 @@ public class TestTaskImpl {
public TaskAttemptState getState() {
return state;
}
-
+
+ @Override
+ public Counters getCounters() {
+ return attemptCounters;
+ }
+
+ public void setCounters(Counters counters) {
+ attemptCounters = counters;
+ }
}
private class MockTask extends Task {
@@ -625,4 +637,49 @@ public class TestTaskImpl {
TaskEventType.T_ATTEMPT_KILLED));
assertEquals(TaskState.FAILED, mockTask.getState());
}
+
+ @Test
+ public void testCountersWithSpeculation() {
+ mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ remoteJobConfFile, conf, taskAttemptListener, jobToken,
+ credentials, clock,
+ completedTasksFromPreviousRun, startCount,
+ metrics, appContext) {
+ @Override
+ protected int getMaxAttempts() {
+ return 1;
+ }
+ };
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.RUNNING);
+ MockTaskAttemptImpl baseAttempt = getLastAttempt();
+
+ // add a speculative attempt
+ mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ updateLastAttemptState(TaskAttemptState.RUNNING);
+ MockTaskAttemptImpl specAttempt = getLastAttempt();
+ assertEquals(2, taskAttempts.size());
+
+ Counters specAttemptCounters = new Counters();
+ Counter cpuCounter = specAttemptCounters.findCounter(
+ TaskCounter.CPU_MILLISECONDS);
+ cpuCounter.setValue(1000);
+ specAttempt.setCounters(specAttemptCounters);
+
+ // have the spec attempt succeed but second attempt at 1.0 progress as well
+ commitTaskAttempt(specAttempt.getAttemptId());
+ specAttempt.setProgress(1.0f);
+ specAttempt.setState(TaskAttemptState.SUCCEEDED);
+ mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+ baseAttempt.setProgress(1.0f);
+
+ Counters taskCounters = mockTask.getCounters();
+ assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
+ }
}