You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/06/30 23:25:11 UTC
hadoop git commit: MAPREDUCE-6384. Add the last reporting reducer
info for too many fetch failure diagnostics. Contributed by Chang Li (cherry
picked from commit b6ba56457c6b01dae795c11d587c3fe3855ee707)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 6719fc94e -> fdfd5be2b
MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li
(cherry picked from commit b6ba56457c6b01dae795c11d587c3fe3855ee707)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fdfd5be2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fdfd5be2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fdfd5be2
Branch: refs/heads/branch-2
Commit: fdfd5be2b3daf54e659ecd6ad1364fc361329ccd
Parents: 6719fc9
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jun 30 21:22:30 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jun 30 21:24:38 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../event/JobTaskAttemptFetchFailureEvent.java | 9 +-
.../TaskAttemptTooManyFetchFailureEvent.java | 50 +++++++
.../mapreduce/v2/app/job/impl/JobImpl.java | 7 +-
.../v2/app/job/impl/TaskAttemptImpl.java | 15 +-
.../mapreduce/v2/app/TestFetchFailure.java | 31 ++--
.../v2/app/job/impl/TestTaskAttempt.java | 143 ++++++++++---------
7 files changed, 168 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b4472a3..e627e6b 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -97,6 +97,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6408. Queue name and user name should be printed on the job page.
(Siqi Li via gera)
+ MAPREDUCE-6384. Add the last reporting reducer info for too many fetch
+ failure diagnostics (Chang Li via jlowe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
index 37e2034..787711c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
@@ -28,13 +28,15 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
private final TaskAttemptId reduce;
private final List<TaskAttemptId> maps;
+ private final String hostname;
public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce,
- List<TaskAttemptId> maps) {
- super(reduce.getTaskId().getJobId(),
+ List<TaskAttemptId> maps, String host) {
+ super(reduce.getTaskId().getJobId(),
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE);
this.reduce = reduce;
this.maps = maps;
+ this.hostname = host;
}
public List<TaskAttemptId> getMaps() {
@@ -45,4 +47,7 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
return reduce;
}
+ public String getHost() {
+ return hostname;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
new file mode 100644
index 0000000..662e712
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+/**
+ * TaskAttemptTooManyFetchFailureEvent is used for TA_TOO_MANY_FETCH_FAILURE.
+ */
+public class TaskAttemptTooManyFetchFailureEvent extends TaskAttemptEvent {
+ private TaskAttemptId reduceID;
+ private String reduceHostname;
+
+ /**
+ * Create a new TaskAttemptTooManyFetchFailureEvent.
+ * @param attemptId the id of the mapper task attempt
+ * @param reduceId the id of the reporting reduce task attempt.
+ * @param reduceHost the hostname of the reporting reduce task attempt.
+ */
+ public TaskAttemptTooManyFetchFailureEvent(TaskAttemptId attemptId,
+ TaskAttemptId reduceId, String reduceHost) {
+ super(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
+ this.reduceID = reduceId;
+ this.reduceHostname = reduceHost;
+ }
+
+ public TaskAttemptId getReduceId() {
+ return reduceID;
+ }
+
+ public String getReduceHost() {
+ return reduceHostname;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 6e9f13c..2c48019 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -103,9 +103,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
@@ -1914,8 +1913,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
&& failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
- job.eventHandler.handle(new TaskAttemptEvent(mapId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ job.eventHandler.handle(new TaskAttemptTooManyFetchFailureEvent(mapId,
+ fetchfailureEvent.getReduce(), fetchfailureEvent.getHost()));
job.fetchFailuresMapping.remove(mapId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 2dd9086..b7def2d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
@@ -1923,12 +1924,17 @@ public abstract class TaskAttemptImpl implements
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+ TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
+ (TaskAttemptTooManyFetchFailureEvent) event;
// too many fetch failure can only happen for map tasks
Preconditions
.checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
//add to diagnostic
- taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
-
+ taskAttempt.addDiagnosticInfo("Too many fetch failures."
+ + " Failing the attempt. Last failure reported by " +
+ fetchFailureEvent.getReduceId() +
+ " from host " + fetchFailureEvent.getReduceHost());
+
if (taskAttempt.getLaunchTime() != 0) {
taskAttempt.eventHandler
.handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
@@ -2211,8 +2217,11 @@ public abstract class TaskAttemptImpl implements
//this only will happen in reduce attempt type
if (taskAttempt.reportedStatus.fetchFailedMaps != null &&
taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
+ String hostname = taskAttempt.container == null ? "UNKNOWN"
+ : taskAttempt.container.getNodeId().getHost();
taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
- taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
+ taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps,
+ hostname));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
index 4e4e2e7..8d25079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
@@ -94,10 +94,10 @@ public class TestFetchFailure {
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
//send 3 fetch failures from reduce to trigger map re execution
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
-
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+
//wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING);
@@ -215,9 +215,9 @@ public class TestFetchFailure {
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
//send 3 fetch failures from reduce to trigger map re execution
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
//wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING);
@@ -324,8 +324,8 @@ public class TestFetchFailure {
updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
//send 2 fetch failures from reduce to prepare for map re execution
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt, mapAttempt1, "host1");
+ sendFetchFailure(app, reduceAttempt2, mapAttempt1, "host2");
//We should not re-launch the map task yet
assertEquals(TaskState.SUCCEEDED, mapTask.getState());
@@ -333,7 +333,7 @@ public class TestFetchFailure {
updateStatus(app, reduceAttempt3, Phase.REDUCE);
//send 3rd fetch failures from reduce to trigger map re execution
- sendFetchFailure(app, reduceAttempt, mapAttempt1);
+ sendFetchFailure(app, reduceAttempt3, mapAttempt1, "host3");
//wait for map Task state move back to RUNNING
app.waitForState(mapTask, TaskState.RUNNING);
@@ -342,6 +342,11 @@ public class TestFetchFailure {
Assert.assertEquals("Map TaskAttempt state not correct",
TaskAttemptState.FAILED, mapAttempt1.getState());
+ Assert.assertEquals(mapAttempt1.getDiagnostics().get(0),
+ "Too many fetch failures. Failing the attempt. "
+ + "Last failure reported by "
+ + reduceAttempt3.getID().toString() + " from host host3");
+
Assert.assertEquals("Num attempts in Map Task not correct",
2, mapTask.getAttempts().size());
@@ -410,7 +415,6 @@ public class TestFetchFailure {
Assert.assertEquals("Unexpected map event", convertedEvents[2],
mapEvents[0]);
}
-
private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
@@ -430,11 +434,12 @@ public class TestFetchFailure {
}
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
- TaskAttempt mapAttempt) {
+ TaskAttempt mapAttempt, String hostname) {
app.getContext().getEventHandler().handle(
new JobTaskAttemptFetchFailureEvent(
reduceAttempt.getID(),
- Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
+ Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}),
+ hostname));
}
static class MRAppWithHistory extends MRApp {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fdfd5be2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 79b88d8..a88a935 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -507,6 +508,9 @@ public class TestTaskAttempt{
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+ TaskAttemptId reduceTAId =
+ MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0);
Path jobFile = mock(Path.class);
MockEventHandler eventHandler = new MockEventHandler();
@@ -554,8 +558,8 @@ public class TestTaskAttempt{
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+ reduceTAId, "Host"));
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
taImpl.handle(new TaskAttemptEvent(attemptId,
@@ -735,72 +739,75 @@ public class TestTaskAttempt{
@Test
public void testFetchFailureAttemptFinishTime() throws Exception{
- ApplicationId appId = ApplicationId.newInstance(1, 2);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 0);
- JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
- TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
- Path jobFile = mock(Path.class);
-
- MockEventHandler eventHandler = new MockEventHandler();
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(
- new InetSocketAddress("localhost", 0));
-
- JobConf jobConf = new JobConf();
- jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
- jobConf.setBoolean("fs.file.impl.disable.cache", true);
- jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
- jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-
- TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
- when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
- AppContext appCtx = mock(AppContext.class);
- ClusterInfo clusterInfo = mock(ClusterInfo.class);
- when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
- setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- splits, jobConf, taListener,mock(Token.class), new Credentials(),
- new SystemClock(), appCtx);
-
- NodeId nid = NodeId.newInstance("127.0.0.1", 0);
- ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
- Container container = mock(Container.class);
- when(container.getId()).thenReturn(contId);
- when(container.getNodeId()).thenReturn(nid);
- when(container.getNodeHttpAddress()).thenReturn("localhost:0");
-
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_SCHEDULE));
- taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
- container, mock(Map.class)));
- taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_DONE));
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-
- assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
- TaskAttemptState.SUCCEEDED);
-
- assertTrue("Task Attempt finish time is not greater than 0",
- taImpl.getFinishTime() > 0);
-
- Long finishTime = taImpl.getFinishTime();
- Thread.sleep(5);
- taImpl.handle(new TaskAttemptEvent(attemptId,
- TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
-
- assertEquals("Task attempt is not in Too Many Fetch Failure state",
- taImpl.getState(), TaskAttemptState.FAILED);
-
- assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
- + " Task attempt finish time is not the same ",
- finishTime, Long.valueOf(taImpl.getFinishTime()));
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+ TaskAttemptId reduceTAId =
+ MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_DONE));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
+ assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+
+ assertTrue("Task Attempt finish time is not greater than 0",
+ taImpl.getFinishTime() > 0);
+
+ Long finishTime = taImpl.getFinishTime();
+ Thread.sleep(5);
+ taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+ reduceTAId, "Host"));
+
+ assertEquals("Task attempt is not in Too Many Fetch Failure state",
+ taImpl.getState(), TaskAttemptState.FAILED);
+
+ assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
+ + " Task attempt finish time is not the same ",
+ finishTime, Long.valueOf(taImpl.getFinishTime()));
}
@Test