You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/17 17:17:06 UTC
svn commit: r1185247 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...
Author: vinodkv
Date: Mon Oct 17 15:17:06 2011
New Revision: 1185247
URL: http://svn.apache.org/viewvc?rev=1185247&view=rev
Log:
MAPREDUCE-3032. Fixed TaskAttemptImpl so that JobHistory can have error information about failed tasks. Contributed by Devaraj K.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Oct 17 15:17:06 2011
@@ -1645,6 +1645,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3127. Changed default value of yarn.resourcemanager.acl.enable
to true and added some more documentation. (acmurthy)
+ MAPREDUCE-3032. Fixed TaskAttemptImpl so that JobHistory can have error
+ information about failed tasks. (Devaraj K via vinodkv)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Mon Oct 17 15:17:06 2011
@@ -302,8 +302,6 @@ public class TaskAttemptListenerImpl ext
taskAttemptStatus.progress = taskStatus.getProgress();
LOG.info("Progress of TaskAttempt " + taskAttemptID + " is : "
+ taskStatus.getProgress());
- // Task sends the diagnostic information to the TT
- taskAttemptStatus.diagnosticInfo = taskStatus.getDiagnosticInfo();
// Task sends the updated state-string to the TT.
taskAttemptStatus.stateString = taskStatus.getStateString();
// Set the output-size when map-task finishes. Set by the task itself.
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java Mon Oct 17 15:17:06 2011
@@ -48,7 +48,6 @@ public class TaskAttemptStatusUpdateEven
public TaskAttemptId id;
public float progress;
public Counters counters;
- public String diagnosticInfo;
public String stateString;
public Phase phase;
public long outputSize;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Mon Oct 17 15:17:06 2011
@@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.util.StringUtils;
+
/**
* Implementation of TaskAttempt interface.
@@ -434,6 +436,9 @@ public abstract class TaskAttemptImpl im
//this is the last status reported by the REMOTE running attempt
private TaskAttemptStatus reportedStatus;
+
+ private static final String LINE_SEPARATOR = System
+ .getProperty("line.separator");
public TaskAttemptImpl(TaskId taskId, int i,
@SuppressWarnings("rawtypes") EventHandler eventHandler,
@@ -758,7 +763,7 @@ public abstract class TaskAttemptImpl im
result.setStartTime(launchTime);
result.setFinishTime(finishTime);
result.setShuffleFinishTime(this.reportedStatus.shuffleFinishTime);
- result.setDiagnosticInfo(reportedStatus.diagnosticInfo);
+ result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
result.setPhase(reportedStatus.phase);
result.setStateString(reportedStatus.stateString);
result.setCounters(getCounters());
@@ -895,7 +900,7 @@ public abstract class TaskAttemptImpl im
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
attemptState.toString(), taskAttempt.finishTime,
taskAttempt.nodeHostName == null ? "UNKNOWN" : taskAttempt.nodeHostName,
- taskAttempt.reportedStatus.diagnosticInfo.toString(),
+ StringUtils.join(LINE_SEPARATOR, taskAttempt.getDiagnostics()),
taskAttempt.getProgressSplitBlock().burst());
return tauce;
}
@@ -1353,8 +1358,6 @@ public abstract class TaskAttemptImpl im
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
- //add to diagnostic
- taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
taskAttempt.updateProgressSplits();
//if fetch failures are present, send the fetch failure event to job
@@ -1382,7 +1385,6 @@ public abstract class TaskAttemptImpl im
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
- result.diagnosticInfo = "";
result.phase = Phase.STARTING;
result.stateString = "NEW";
result.taskState = TaskAttemptState.NEW;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Oct 17 15:17:06 2011
@@ -334,7 +334,6 @@ public class RecoveryService extends Com
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = yarnAttemptID;
taskAttemptStatus.progress = 1.0f;
- taskAttemptStatus.diagnosticInfo = "";
taskAttemptStatus.stateString = attemptInfo.getTaskStatus();
// taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
taskAttemptStatus.phase = Phase.CLEANUP;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1185247&r1=1185246&r2=1185247&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Mon Oct 17 15:17:06 2011
@@ -83,7 +83,6 @@ public class TestMRClientService {
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = attempt.getID();
taskAttemptStatus.progress = 0.5f;
- taskAttemptStatus.diagnosticInfo = diagnostic2;
taskAttemptStatus.stateString = "RUNNING";
taskAttemptStatus.taskState = TaskAttemptState.RUNNING;
taskAttemptStatus.phase = Phase.MAP;
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1185247&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Mon Oct 17 15:17:06 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+public class TestTaskAttempt{
+
+ @Test
+ public void testMRAppHistoryForMap() throws Exception {
+ MRApp app = new FailingAttemptsMRApp(1, 0);
+ testMRAppHistory(app);
+ }
+
+ @Test
+ public void testMRAppHistoryForReduce() throws Exception {
+ MRApp app = new FailingAttemptsMRApp(0, 1);
+ testMRAppHistory(app);
+ }
+
+ private void testMRAppHistory(MRApp app) throws Exception {
+ Configuration conf = new Configuration();
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.FAILED);
+ Map<TaskId, Task> tasks = job.getTasks();
+
+ Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+ Task task = tasks.values().iterator().next();
+ Assert.assertEquals("Task state not correct", TaskState.FAILED, task
+ .getReport().getTaskState());
+ Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
+ .getAttempts();
+ Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
+
+ Iterator<TaskAttempt> it = attempts.values().iterator();
+ TaskAttemptReport report = it.next().getReport();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ report.getTaskAttemptState());
+ Assert.assertEquals("Diagnostic Information is not Correct",
+ "Test Diagnostic Event", report.getDiagnosticInfo());
+ report = it.next().getReport();
+ Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+ report.getTaskAttemptState());
+ }
+
+ static class FailingAttemptsMRApp extends MRApp {
+ FailingAttemptsMRApp(int maps, int reduces) {
+ super(maps, reduces, true, "FailingAttemptsMRApp", true);
+ }
+
+ @Override
+ protected void attemptLaunched(TaskAttemptId attemptID) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptDiagnosticsUpdateEvent(attemptID,
+ "Test Diagnostic Event"));
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG));
+ }
+
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ return new EventHandler<JobHistoryEvent>() {
+ @Override
+ public void handle(JobHistoryEvent event) {
+ if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.EventType.MAP_ATTEMPT_FAILED) {
+ TaskAttemptUnsuccessfulCompletion datum = (TaskAttemptUnsuccessfulCompletion) event
+ .getHistoryEvent().getDatum();
+ Assert.assertEquals("Diagnostic Information is not Correct",
+ "Test Diagnostic Event", datum.get(6).toString());
+ }
+ }
+ };
+ }
+ }
+}