You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/07/01 23:10:14 UTC

[28/46] hadoop git commit: MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li

MAPREDUCE-6384. Add the last reporting reducer info for too many fetch failure diagnostics. Contributed by Chang Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b6ba5645
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b6ba5645
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b6ba5645

Branch: refs/heads/HDFS-7240
Commit: b6ba56457c6b01dae795c11d587c3fe3855ee707
Parents: 147e020
Author: Jason Lowe <jl...@apache.org>
Authored: Tue Jun 30 21:22:30 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue Jun 30 21:22:30 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../event/JobTaskAttemptFetchFailureEvent.java  |   9 +-
 .../TaskAttemptTooManyFetchFailureEvent.java    |  50 +++++++
 .../mapreduce/v2/app/job/impl/JobImpl.java      |   7 +-
 .../v2/app/job/impl/TaskAttemptImpl.java        |  15 +-
 .../mapreduce/v2/app/TestFetchFailure.java      |  31 ++--
 .../v2/app/job/impl/TestTaskAttempt.java        | 143 ++++++++++---------
 7 files changed, 168 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 785fce8..5a4d826 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -364,6 +364,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6408. Queue name and user name should be printed on the job page.
     (Siqi Li via gera)
 
+    MAPREDUCE-6384. Add the last reporting reducer info for too many fetch
+    failure diagnostics (Chang Li via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
index 37e2034..787711c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
@@ -28,13 +28,15 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
 
   private final TaskAttemptId reduce;
   private final List<TaskAttemptId> maps;
+  private final String hostname;
 
   public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce, 
-      List<TaskAttemptId> maps) {
-    super(reduce.getTaskId().getJobId(), 
+      List<TaskAttemptId> maps, String host) {
+    super(reduce.getTaskId().getJobId(),
         JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE);
     this.reduce = reduce;
     this.maps = maps;
+    this.hostname = host;
   }
 
   public List<TaskAttemptId> getMaps() {
@@ -45,4 +47,7 @@ public class JobTaskAttemptFetchFailureEvent extends JobEvent {
     return reduce;
   }
 
+  public String getHost() {
+    return hostname;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
new file mode 100644
index 0000000..662e712
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptTooManyFetchFailureEvent.java
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+/**
+ * TaskAttemptTooManyFetchFailureEvent is used for TA_TOO_MANY_FETCH_FAILURE.
+ */
+public class TaskAttemptTooManyFetchFailureEvent extends TaskAttemptEvent {
+  private TaskAttemptId reduceID;
+  private String  reduceHostname;
+
+  /**
+   * Create a new TaskAttemptTooManyFetchFailureEvent.
+   * @param attemptId the id of the mapper task attempt
+   * @param reduceId the id of the reporting reduce task attempt.
+   * @param reduceHost the hostname of the reporting reduce task attempt.
+   */
+  public TaskAttemptTooManyFetchFailureEvent(TaskAttemptId attemptId,
+      TaskAttemptId reduceId, String reduceHost) {
+      super(attemptId, TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
+    this.reduceID = reduceId;
+    this.reduceHostname = reduceHost;
+  }
+
+  public TaskAttemptId getReduceId() {
+    return reduceID;
+  }
+
+  public String getReduceHost() {
+    return reduceHostname;
+  }  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 6e9f13c..2c48019 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -103,9 +103,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent;
@@ -1914,8 +1913,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
             && failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
           LOG.info("Too many fetch-failures for output of task attempt: " + 
               mapId + " ... raising fetch failure to map");
-          job.eventHandler.handle(new TaskAttemptEvent(mapId, 
-              TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+          job.eventHandler.handle(new TaskAttemptTooManyFetchFailureEvent(mapId,
+              fetchfailureEvent.getReduce(), fetchfailureEvent.getHost()));
           job.fetchFailuresMapping.remove(mapId);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 3055a25..3fa42fe 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -95,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
@@ -1916,12 +1917,17 @@ public abstract class TaskAttemptImpl implements
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      TaskAttemptTooManyFetchFailureEvent fetchFailureEvent =
+          (TaskAttemptTooManyFetchFailureEvent) event;
       // too many fetch failure can only happen for map tasks
       Preconditions
           .checkArgument(taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP);
       //add to diagnostic
-      taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
-      
+      taskAttempt.addDiagnosticInfo("Too many fetch failures."
+          + " Failing the attempt. Last failure reported by " +
+          fetchFailureEvent.getReduceId() +
+          " from host " + fetchFailureEvent.getReduceHost());
+
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
             .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
@@ -2225,8 +2231,11 @@ public abstract class TaskAttemptImpl implements
       //this only will happen in reduce attempt type
       if (taskAttempt.reportedStatus.fetchFailedMaps != null && 
           taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
+        String hostname = taskAttempt.container == null ? "UNKNOWN"
+            : taskAttempt.container.getNodeId().getHost();
         taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
-            taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
+            taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps,
+                hostname));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
index 4e4e2e7..8d25079 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
@@ -94,10 +94,10 @@ public class TestFetchFailure {
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
     
     //send 3 fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
     
@@ -215,9 +215,9 @@ public class TestFetchFailure {
     app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
 
     //send 3 fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host");
 
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
@@ -324,8 +324,8 @@ public class TestFetchFailure {
     updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
 
     //send 2 fetch failures from reduce to prepare for map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1, "host1");
+    sendFetchFailure(app, reduceAttempt2, mapAttempt1, "host2");
 
     //We should not re-launch the map task yet
     assertEquals(TaskState.SUCCEEDED, mapTask.getState());
@@ -333,7 +333,7 @@ public class TestFetchFailure {
     updateStatus(app, reduceAttempt3, Phase.REDUCE);
 
     //send 3rd fetch failures from reduce to trigger map re execution
-    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt3, mapAttempt1, "host3");
 
     //wait for map Task state move back to RUNNING
     app.waitForState(mapTask, TaskState.RUNNING);
@@ -342,6 +342,11 @@ public class TestFetchFailure {
     Assert.assertEquals("Map TaskAttempt state not correct",
         TaskAttemptState.FAILED, mapAttempt1.getState());
 
+    Assert.assertEquals(mapAttempt1.getDiagnostics().get(0),
+            "Too many fetch failures. Failing the attempt. "
+            + "Last failure reported by "
+            + reduceAttempt3.getID().toString() + " from host host3");
+
     Assert.assertEquals("Num attempts in Map Task not correct",
         2, mapTask.getAttempts().size());
     
@@ -410,7 +415,6 @@ public class TestFetchFailure {
     Assert.assertEquals("Unexpected map event", convertedEvents[2],
         mapEvents[0]);
   }
-  
 
   private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
     TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
@@ -430,11 +434,12 @@ public class TestFetchFailure {
   }
 
   private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
-      TaskAttempt mapAttempt) {
+      TaskAttempt mapAttempt, String hostname) {
     app.getContext().getEventHandler().handle(
         new JobTaskAttemptFetchFailureEvent(
             reduceAttempt.getID(), 
-            Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
+            Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}),
+                hostname));
   }
   
   static class MRAppWithHistory extends MRApp {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6ba5645/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 79b88d8..a88a935 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -507,6 +508,9 @@ public class TestTaskAttempt{
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    TaskId reduceTaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptId reduceTAId =
+        MRBuilderUtils.newTaskAttemptId(reduceTaskId, 0);
     Path jobFile = mock(Path.class);
 
     MockEventHandler eventHandler = new MockEventHandler();
@@ -554,8 +558,8 @@ public class TestTaskAttempt{
 
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
-    taImpl.handle(new TaskAttemptEvent(attemptId,
-        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+        reduceTAId, "Host"));
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
     taImpl.handle(new TaskAttemptEvent(attemptId,
@@ -735,72 +739,75 @@ public class TestTaskAttempt{
     
   @Test
   public void testFetchFailureAttemptFinishTime() throws Exception{
-	ApplicationId appId = ApplicationId.newInstance(1, 2);
-	ApplicationAttemptId appAttemptId =
-	ApplicationAttemptId.newInstance(appId, 0);
-	JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-	TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-	TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-	Path jobFile = mock(Path.class);
-
-	MockEventHandler eventHandler = new MockEventHandler();
-	TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-	when(taListener.getAddress()).thenReturn(
-		new InetSocketAddress("localhost", 0));
-
-	JobConf jobConf = new JobConf();
-	jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-	jobConf.setBoolean("fs.file.impl.disable.cache", true);
-	jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-	jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-
-	TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-	when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
-	AppContext appCtx = mock(AppContext.class);
-	ClusterInfo clusterInfo = mock(ClusterInfo.class);
-	when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
-  setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
-
-	TaskAttemptImpl taImpl =
-	  new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-	  splits, jobConf, taListener,mock(Token.class), new Credentials(),
-	  new SystemClock(), appCtx);
-
-	NodeId nid = NodeId.newInstance("127.0.0.1", 0);
-	ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
-	Container container = mock(Container.class);
-	when(container.getId()).thenReturn(contId);
-	when(container.getNodeId()).thenReturn(nid);
-	when(container.getNodeHttpAddress()).thenReturn("localhost:0"); 
-	    
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	 	TaskAttemptEventType.TA_SCHEDULE));
-	taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
-	    container, mock(Map.class)));
-	taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_DONE));
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	    TaskAttemptEventType.TA_CONTAINER_COMPLETED));
-	    
-	assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
-		      TaskAttemptState.SUCCEEDED);
-	
-	assertTrue("Task Attempt finish time is not greater than 0", 
-			taImpl.getFinishTime() > 0);
-	
-	Long finishTime = taImpl.getFinishTime();
-	Thread.sleep(5);   
-	taImpl.handle(new TaskAttemptEvent(attemptId,
-	   TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
-	
-	assertEquals("Task attempt is not in Too Many Fetch Failure state", 
-			taImpl.getState(), TaskAttemptState.FAILED);
-	
-	assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
-		+ " Task attempt finish time is not the same ",
-		finishTime, Long.valueOf(taImpl.getFinishTime()));  
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId =
+    ApplicationAttemptId.newInstance(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    TaskId reducetaskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
+    TaskAttemptId reduceTAId =
+        MRBuilderUtils.newTaskAttemptId(reducetaskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    setupTaskAttemptFinishingMonitor(eventHandler, jobConf, appCtx);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+      splits, jobConf, taListener,mock(Token.class), new Credentials(),
+      new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_DONE));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+
+    assertTrue("Task Attempt finish time is not greater than 0",
+        taImpl.getFinishTime() > 0);
+
+    Long finishTime = taImpl.getFinishTime();
+    Thread.sleep(5);
+    taImpl.handle(new TaskAttemptTooManyFetchFailureEvent(attemptId,
+        reduceTAId, "Host"));
+
+    assertEquals("Task attempt is not in Too Many Fetch Failure state",
+        taImpl.getState(), TaskAttemptState.FAILED);
+
+    assertEquals("After TA_TOO_MANY_FETCH_FAILURE,"
+        + " Task attempt finish time is not the same ",
+        finishTime, Long.valueOf(taImpl.getFinishTime()));
   }
   
   @Test