You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/10/19 22:18:46 UTC

svn commit: r1400264 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/...

Author: vinodkv
Date: Fri Oct 19 20:18:46 2012
New Revision: 1400264

URL: http://svn.apache.org/viewvc?rev=1400264&view=rev
Log:
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. Contributed by Jason Lowe via.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 19 20:18:46 2012
@@ -593,6 +593,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4479. Fix parameter order in assertEquals() in 
     TestCombineInputFileFormat.java (Mariappan Asokan via bobby)
 
+    MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
+    reducers complete consecutively. (Jason Lowe via vinodkv)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Oct 19 20:18:46 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRa
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl ext
 
   @Override
   public MapTaskCompletionEventsUpdate getMapCompletionEvents(
-      JobID jobIdentifier, int fromEventId, int maxEvents,
+      JobID jobIdentifier, int startIndex, int maxEvents,
       TaskAttemptID taskAttemptID) throws IOException {
     LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
-        + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+        + ". startIndex " + startIndex + " maxEvents " + maxEvents);
 
     // TODO: shouldReset is never used. See TT. Ask for Removal.
     boolean shouldReset = false;
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
       TypeConverter.toYarn(taskAttemptID);
     org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
-        context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
-            fromEventId, maxEvents);
+        context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
+            startIndex, maxEvents);
 
     taskHeartbeatHandler.progressing(attemptID);
-
-    // filter the events to return only map completion events in old format
-    List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
-    for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) {
-      if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) {
-        mapEvents.add(TypeConverter.fromYarn(event));
-      }
-    }
     
     return new MapTaskCompletionEventsUpdate(
-        mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+        TypeConverter.fromYarn(events), shouldReset);
   }
 
   @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Fri Oct 19 20:18:46 2012
@@ -88,6 +88,9 @@ public interface Job {
   TaskAttemptCompletionEvent[]
       getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
 
+  TaskAttemptCompletionEvent[]
+      getMapAttemptCompletionEvents(int startIndex, int maxEvents);
+
   /**
    * @return information for MR AppMasters (previously failed and current)
    */

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 20:18:46 2012
@@ -190,6 +190,7 @@ public class JobImpl implements org.apac
   private int allowedMapFailuresPercent = 0;
   private int allowedReduceFailuresPercent = 0;
   private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
+  private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
   private final List<String> diagnostics = new ArrayList<String>();
   
   //task/attempt related datastructures
@@ -548,14 +549,28 @@ public class JobImpl implements org.apac
   @Override
   public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
       int fromEventId, int maxEvents) {
+    return getAttemptCompletionEvents(taskAttemptCompletionEvents,
+        fromEventId, maxEvents);
+  }
+
+  @Override
+  public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    return getAttemptCompletionEvents(mapAttemptCompletionEvents,
+        startIndex, maxEvents);
+  }
+
+  private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+      List<TaskAttemptCompletionEvent> eventList,
+      int startIndex, int maxEvents) {
     TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
     readLock.lock();
     try {
-      if (taskAttemptCompletionEvents.size() > fromEventId) {
+      if (eventList.size() > startIndex) {
         int actualMax = Math.min(maxEvents,
-            (taskAttemptCompletionEvents.size() - fromEventId));
-        events = taskAttemptCompletionEvents.subList(fromEventId,
-            actualMax + fromEventId).toArray(events);
+            (eventList.size() - startIndex));
+        events = eventList.subList(startIndex,
+            actualMax + startIndex).toArray(events);
       }
       return events;
     } finally {
@@ -1070,6 +1085,8 @@ public class JobImpl implements org.apac
         job.taskAttemptCompletionEvents =
             new ArrayList<TaskAttemptCompletionEvent>(
                 job.numMapTasks + job.numReduceTasks + 10);
+        job.mapAttemptCompletionEvents =
+            new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
 
         job.allowedMapFailuresPercent =
             job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
@@ -1341,6 +1358,9 @@ public class JobImpl implements org.apac
       //eventId is equal to index in the arraylist
       tce.setEventId(job.taskAttemptCompletionEvents.size());
       job.taskAttemptCompletionEvents.add(tce);
+      if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
+        job.mapAttemptCompletionEvents.add(tce);
+      }
       
       TaskAttemptId attemptId = tce.getAttemptId();
       TaskId taskId = attemptId.getTaskId();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Oct 19 20:18:46 2012
@@ -17,20 +17,33 @@
 */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
 
 public class TestTaskAttemptListenerImpl {
@@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl
 
     listener.stop();
   }
+
+  @Test
+  public void testGetMapCompletionEvents() throws IOException {
+    TaskAttemptCompletionEvent[] empty = {};
+    TaskAttemptCompletionEvent[] taskEvents = {
+        createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
+        createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
+        createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
+        createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
+    TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTaskAttemptCompletionEvents(0, 100))
+      .thenReturn(taskEvents);
+    when(mockJob.getTaskAttemptCompletionEvents(0, 2))
+      .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
+    when(mockJob.getTaskAttemptCompletionEvents(2, 100))
+      .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
+    when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
+    when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
+    when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
+
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+
+    JobID jid = new JobID("12345", 1);
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+    MapTaskCompletionEventsUpdate update =
+        listener.getMapCompletionEvents(jid, 0, 100, tid);
+    assertEquals(2, update.events.length);
+    update = listener.getMapCompletionEvents(jid, 0, 2, tid);
+    assertEquals(2, update.events.length);
+    update = listener.getMapCompletionEvents(jid, 2, 100, tid);
+    assertEquals(0, update.events.length);
+  }
+
+  private static TaskAttemptCompletionEvent createTce(int eventId,
+      boolean isMap, TaskAttemptCompletionEventStatus status) {
+    JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+    TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+        isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
+            : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    TaskAttemptCompletionEvent tce = recordFactory
+        .newRecordInstance(TaskAttemptCompletionEvent.class);
+    tce.setEventId(eventId);
+    tce.setAttemptId(attemptId);
+    tce.setStatus(status);
+    return tce;
+  }
+
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Oct 19 20:18:46 2012
@@ -556,6 +556,12 @@ public class MockJobs extends MockApps {
       }
 
       @Override
+      public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+          int startIndex, int maxEvents) {
+        return null;
+      }
+
+      @Override
       public Map<TaskId, Task> getTasks(TaskType taskType) {
         throw new UnsupportedOperationException("Not supported yet.");
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Fri Oct 19 20:18:46 2012
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.Arrays;
 import java.util.Iterator;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestFetchFailure {
@@ -144,6 +143,15 @@ public class TestFetchFailure {
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
     Assert.assertEquals("Event status not correct for reduce attempt1",
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+    TaskAttemptCompletionEvent mapEvents[] =
+        job.getMapAttemptCompletionEvents(0, 2);
+    Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+    Assert.assertArrayEquals("Unexpected map events",
+        Arrays.copyOfRange(events, 0, 2), mapEvents);
+    mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+    Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
   }
   
   /**

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Fri Oct 19 20:18:46 2012
@@ -441,6 +441,12 @@ public class TestRuntimeEstimators {
     }
 
     @Override
+    public TaskAttemptCompletionEvent[]
+            getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
     public String getName() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Oct 19 20:18:46 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.h
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -81,6 +82,7 @@ public class CompletedJob implements org
   private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
   private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
   private List<TaskAttemptCompletionEvent> completionEvents = null;
+  private List<TaskAttemptCompletionEvent> mapCompletionEvents = null;
   private JobACLsManager aclsMgr;
   
   
@@ -176,11 +178,28 @@ public class CompletedJob implements org
     if (completionEvents == null) {
       constructTaskAttemptCompletionEvents();
     }
+    return getAttemptCompletionEvents(completionEvents,
+        fromEventId, maxEvents);
+  }
+
+  @Override
+  public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    if (mapCompletionEvents == null) {
+      constructTaskAttemptCompletionEvents();
+    }
+    return getAttemptCompletionEvents(mapCompletionEvents,
+        startIndex, maxEvents);
+  }
+
+  private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+      List<TaskAttemptCompletionEvent> eventList,
+      int startIndex, int maxEvents) {
     TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
-    if (completionEvents.size() > fromEventId) {
+    if (eventList.size() > startIndex) {
       int actualMax = Math.min(maxEvents,
-          (completionEvents.size() - fromEventId));
-      events = completionEvents.subList(fromEventId, actualMax + fromEventId)
+          (eventList.size() - startIndex));
+      events = eventList.subList(startIndex, actualMax + startIndex)
           .toArray(events);
     }
     return events;
@@ -190,11 +209,15 @@ public class CompletedJob implements org
     loadAllTasks();
     completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
     List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
+    int numMapAttempts = 0;
     for (TaskId taskId : tasks.keySet()) {
       Task task = tasks.get(taskId);
       for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
         TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
         allTaskAttempts.add(taskAttempt);
+        if (task.getType() == TaskType.MAP) {
+          ++numMapAttempts;
+        }
       }
     }
     Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
@@ -223,6 +246,8 @@ public class CompletedJob implements org
       }
     });
 
+    mapCompletionEvents =
+        new ArrayList<TaskAttemptCompletionEvent>(numMapAttempts);
     int eventId = 0;
     for (TaskAttempt taskAttempt : allTaskAttempts) {
 
@@ -253,6 +278,9 @@ public class CompletedJob implements org
           .getAssignedContainerMgrAddress());
       tace.setStatus(taceStatus);
       completionEvents.add(tace);
+      if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) {
+        mapCompletionEvents.add(tace);
+      }
     }
   }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Fri Oct 19 20:18:46 2012
@@ -154,6 +154,12 @@ public class PartialJob implements org.a
   }
 
   @Override
+  public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    return null;
+  }
+
+  @Override
   public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
     return true;
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1400264&r1=1400263&r2=1400264&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Fri Oct 19 20:18:46 2012
@@ -126,6 +126,12 @@ public class MockHistoryJobs extends Moc
     }
 
     @Override
+    public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+        int startIndex, int maxEvents) {
+      return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
+    }
+
+    @Override
     public Map<TaskId, Task> getTasks() {
       return job.getTasks();
     }