You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/12 08:51:32 UTC
svn commit: r1182230 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...
Author: vinodkv
Date: Wed Oct 12 06:51:32 2011
New Revision: 1182230
URL: http://svn.apache.org/viewvc?rev=1182230&view=rev
Log:
MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed task-attempts for obtaining task's progress. Contributed by Hitesh Shah.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1182230&r1=1182229&r2=1182230&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Oct 12 06:51:32 2011
@@ -1585,6 +1585,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3140. Fixed the invalid JobHistory URL for failed
applications. (Subroto Sanyal via vinodkv)
+ MAPREDUCE-3125. Modified TaskImpl to consider only non-failed, non-killed
+ task-attempts for obtaining task's progress. (Hitesh Shah via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1182230&r1=1182229&r2=1182230&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Wed Oct 12 06:51:32 2011
@@ -441,10 +441,20 @@ public abstract class TaskImpl implement
float progress = 0f;
TaskAttempt result = null;
for (TaskAttempt at : attempts.values()) {
+ switch (at.getState()) {
+
+ // ignore all failed task attempts
+ case FAIL_CONTAINER_CLEANUP:
+ case FAIL_TASK_CLEANUP:
+ case FAILED:
+ case KILL_CONTAINER_CLEANUP:
+ case KILL_TASK_CLEANUP:
+ case KILLED:
+ continue;
+ }
if (result == null) {
result = at; //The first time around
}
- //TODO: consider the nextAttemptNumber only if it is not failed/killed ?
// calculate the best progress
if (at.getProgress() > progress) {
result = at;
@@ -496,7 +506,7 @@ public abstract class TaskImpl implement
break;
case 1:
- Map newAttempts
+ Map<TaskAttemptId, TaskAttempt> newAttempts
= new LinkedHashMap<TaskAttemptId, TaskAttempt>(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1182230&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Wed Oct 12 06:51:32 2011
@@ -0,0 +1,398 @@
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTaskImpl {
+
+ private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
+
+ private Configuration conf;
+ private TaskAttemptListener taskAttemptListener;
+ private OutputCommitter committer;
+ private Token<JobTokenIdentifier> jobToken;
+ private JobId jobId;
+ private Path remoteJobConfFile;
+ private Collection<Token<? extends TokenIdentifier>> fsTokens;
+ private Clock clock;
+ private Set<TaskId> completedTasksFromPreviousRun;
+ private MRAppMetrics metrics;
+ private TaskImpl mockTask;
+ private ApplicationId appId;
+ private TaskSplitMetaInfo taskSplitMetaInfo;
+ private String[] dataLocations = new String[0];
+ private final TaskType taskType = TaskType.MAP;
+
+ private int startCount = 0;
+ private int taskCounter = 0;
+ private final int partition = 1;
+
+ private InlineDispatcher dispatcher;
+ private List<MockTaskAttemptImpl> taskAttempts;
+
+ private class MockTaskImpl extends TaskImpl {
+
+ private int taskAttemptCounter = 0;
+
+ @SuppressWarnings("rawtypes")
+ public MockTaskImpl(JobId jobId, int partition,
+ EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
+ TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock,
+ Set<TaskId> completedTasksFromPreviousRun, int startCount,
+ MRAppMetrics metrics) {
+ super(jobId, taskType , partition, eventHandler,
+ remoteJobConfFile, conf, taskAttemptListener, committer,
+ jobToken, fsTokens, clock,
+ completedTasksFromPreviousRun, startCount, metrics);
+ }
+
+ @Override
+ public TaskType getType() {
+ return taskType;
+ }
+
+ @Override
+ protected TaskAttemptImpl createAttempt() {
+ MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
+ eventHandler, taskAttemptListener, remoteJobConfFile, partition,
+ conf, committer, jobToken, fsTokens, clock);
+ taskAttempts.add(attempt);
+ return attempt;
+ }
+
+ @Override
+ protected int getMaxAttempts() {
+ return 100;
+ }
+
+ }
+
+ private class MockTaskAttemptImpl extends TaskAttemptImpl {
+
+ private float progress = 0;
+ private TaskAttemptState state = TaskAttemptState.NEW;
+ private TaskAttemptId attemptId;
+
+ @SuppressWarnings("rawtypes")
+ public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
+ TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
+ Configuration conf, OutputCommitter committer,
+ Token<JobTokenIdentifier> jobToken,
+ Collection<Token<? extends TokenIdentifier>> fsTokens, Clock clock) {
+ super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
+ dataLocations, committer, jobToken, fsTokens, clock);
+ attemptId = Records.newRecord(TaskAttemptId.class);
+ attemptId.setId(id);
+ attemptId.setTaskId(taskId);
+ }
+
+ public TaskAttemptId getAttemptId() {
+ return attemptId;
+ }
+
+ @Override
+ protected Task createRemoteTask() {
+ return new MockTask();
+ }
+
+ public float getProgress() {
+ return progress ;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public void setState(TaskAttemptState state) {
+ this.state = state;
+ }
+
+ public TaskAttemptState getState() {
+ return state;
+ }
+
+ }
+
+ private class MockTask extends Task {
+
+ @Override
+ @SuppressWarnings("deprecation")
+ public void run(JobConf job, TaskUmbilicalProtocol umbilical)
+ throws IOException, ClassNotFoundException, InterruptedException {
+ return;
+ }
+
+ @Override
+ public boolean isMapTask() {
+ return true;
+ }
+
+ }
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() {
+ dispatcher = new InlineDispatcher();
+
+ ++startCount;
+
+ conf = new Configuration();
+ taskAttemptListener = mock(TaskAttemptListener.class);
+ committer = mock(OutputCommitter.class);
+ jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+ remoteJobConfFile = mock(Path.class);
+ fsTokens = null;
+ clock = new SystemClock();
+ metrics = mock(MRAppMetrics.class);
+ dataLocations = new String[1];
+
+ appId = Records.newRecord(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(1);
+
+ jobId = Records.newRecord(JobId.class);
+ jobId.setId(1);
+ jobId.setAppId(appId);
+
+ taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
+ when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
+
+ taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+
+ mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+ fsTokens, clock,
+ completedTasksFromPreviousRun, startCount,
+ metrics);
+
+ }
+
+ @After
+ public void teardown() {
+ taskAttempts.clear();
+ }
+
+ private TaskId getNewTaskID() {
+ TaskId taskId = Records.newRecord(TaskId.class);
+ taskId.setId(++taskCounter);
+ taskId.setJobId(jobId);
+ taskId.setTaskType(mockTask.getType());
+ return taskId;
+ }
+
+ private void scheduleTaskAttempt(TaskId taskId) {
+ mockTask.handle(new TaskEvent(taskId,
+ TaskEventType.T_SCHEDULE));
+ assertTaskScheduledState();
+ }
+
+ private void killTask(TaskId taskId) {
+ mockTask.handle(new TaskEvent(taskId,
+ TaskEventType.T_KILL));
+ assertTaskKillWaitState();
+ }
+
+ private void killScheduledTaskAttempt(TaskAttemptId attemptId) {
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertTaskScheduledState();
+ }
+
+ private void launchTaskAttempt(TaskAttemptId attemptId) {
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
+ TaskEventType.T_ATTEMPT_LAUNCHED));
+ assertTaskRunningState();
+ }
+
+ private MockTaskAttemptImpl getLastAttempt() {
+ return taskAttempts.get(taskAttempts.size()-1);
+ }
+
+ private void updateLastAttemptProgress(float p) {
+ getLastAttempt().setProgress(p);
+ }
+
+ private void updateLastAttemptState(TaskAttemptState s) {
+ getLastAttempt().setState(s);
+ }
+
+ private void killRunningTaskAttempt(TaskAttemptId attemptId) {
+ mockTask.handle(new TaskTAttemptEvent(attemptId,
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertTaskRunningState();
+ }
+
+ /**
+ * {@link TaskState#NEW}
+ */
+ private void assertTaskNewState() {
+ assertEquals(mockTask.getState(), TaskState.NEW);
+ }
+
+ /**
+ * {@link TaskState#SCHEDULED}
+ */
+ private void assertTaskScheduledState() {
+ assertEquals(mockTask.getState(), TaskState.SCHEDULED);
+ }
+
+ /**
+ * {@link TaskState#RUNNING}
+ */
+ private void assertTaskRunningState() {
+ assertEquals(mockTask.getState(), TaskState.RUNNING);
+ }
+
+ /**
+ * {@link TaskState#KILL_WAIT}
+ */
+ private void assertTaskKillWaitState() {
+ assertEquals(mockTask.getState(), TaskState.KILL_WAIT);
+ }
+
+ @Test
+ public void testInit() {
+ LOG.info("--- START: testInit ---");
+ assertTaskNewState();
+ assert(taskAttempts.size() == 0);
+ }
+
+ @Test
+ /**
+ * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
+ */
+ public void testScheduleTask() {
+ LOG.info("--- START: testScheduleTask ---");
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ }
+
+ @Test
+ /**
+ * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
+ */
+ public void testKillScheduledTask() {
+ LOG.info("--- START: testKillScheduledTask ---");
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ killTask(taskId);
+ }
+
+ @Test
+ /**
+ * Kill attempt
+ * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
+ */
+ public void testKillScheduledTaskAttempt() {
+ LOG.info("--- START: testKillScheduledTaskAttempt ---");
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ killScheduledTaskAttempt(getLastAttempt().getAttemptId());
+ }
+
+ @Test
+ /**
+ * Launch attempt
+ * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
+ */
+ public void testLaunchTaskAttempt() {
+ LOG.info("--- START: testLaunchTaskAttempt ---");
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ }
+
+ @Test
+ /**
+ * Kill running attempt
+ * {@link TaskState#RUNNING}->{@link TaskState#RUNNING}
+ */
+ public void testKillRunningTaskAttempt() {
+ LOG.info("--- START: testKillRunningTaskAttempt ---");
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ killRunningTaskAttempt(getLastAttempt().getAttemptId());
+ }
+
+ @Test
+ public void testTaskProgress() {
+ LOG.info("--- START: testTaskProgress ---");
+
+ // launch task
+ TaskId taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ float progress = 0f;
+ assert(mockTask.getProgress() == progress);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+
+ // update attempt1
+ progress = 50f;
+ updateLastAttemptProgress(progress);
+ assert(mockTask.getProgress() == progress);
+ progress = 100f;
+ updateLastAttemptProgress(progress);
+ assert(mockTask.getProgress() == progress);
+
+ progress = 0f;
+ // mark first attempt as killed
+ updateLastAttemptState(TaskAttemptState.KILLED);
+ assert(mockTask.getProgress() == progress);
+
+ // kill first attempt
+ // should trigger a new attempt
+ // as no successful attempts
+ killRunningTaskAttempt(getLastAttempt().getAttemptId());
+ assert(taskAttempts.size() == 2);
+
+ assert(mockTask.getProgress() == 0f);
+ launchTaskAttempt(getLastAttempt().getAttemptId());
+ progress = 50f;
+ updateLastAttemptProgress(progress);
+ assert(mockTask.getProgress() == progress);
+
+ }
+
+}