You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/10/19 22:21:33 UTC
svn commit: r1400267 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org...
Author: vinodkv
Date: Fri Oct 19 20:21:32 2012
New Revision: 1400267
URL: http://svn.apache.org/viewvc?rev=1400267&view=rev
Log:
MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many reducers complete consecutively. Contributed by Jason Lowe.
svn merge --ignore-ancestry -c 1400264 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Oct 19 20:21:32 2012
@@ -37,6 +37,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4479. Fix parameter order in assertEquals() in
TestCombineInputFileFormat.java (Mariappan Asokan via bobby)
+ MAPREDUCE-4733. Reducer can fail to make progress during shuffle if too many
+ reducers complete consecutively. (Jason Lowe via vinodkv)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Oct 19 20:21:32 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.SortedRa
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
@@ -253,31 +251,23 @@ public class TaskAttemptListenerImpl ext
@Override
public MapTaskCompletionEventsUpdate getMapCompletionEvents(
- JobID jobIdentifier, int fromEventId, int maxEvents,
+ JobID jobIdentifier, int startIndex, int maxEvents,
TaskAttemptID taskAttemptID) throws IOException {
LOG.info("MapCompletionEvents request from " + taskAttemptID.toString()
- + ". fromEventID " + fromEventId + " maxEvents " + maxEvents);
+ + ". startIndex " + startIndex + " maxEvents " + maxEvents);
// TODO: shouldReset is never used. See TT. Ask for Removal.
boolean shouldReset = false;
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
TypeConverter.toYarn(taskAttemptID);
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[] events =
- context.getJob(attemptID.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
- fromEventId, maxEvents);
+ context.getJob(attemptID.getTaskId().getJobId()).getMapAttemptCompletionEvents(
+ startIndex, maxEvents);
taskHeartbeatHandler.progressing(attemptID);
-
- // filter the events to return only map completion events in old format
- List<TaskCompletionEvent> mapEvents = new ArrayList<TaskCompletionEvent>();
- for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent event : events) {
- if (TaskType.MAP.equals(event.getAttemptId().getTaskId().getTaskType())) {
- mapEvents.add(TypeConverter.fromYarn(event));
- }
- }
return new MapTaskCompletionEventsUpdate(
- mapEvents.toArray(new TaskCompletionEvent[0]), shouldReset);
+ TypeConverter.fromYarn(events), shouldReset);
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Fri Oct 19 20:21:32 2012
@@ -88,6 +88,9 @@ public interface Job {
TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents);
+ TaskAttemptCompletionEvent[]
+ getMapAttemptCompletionEvents(int startIndex, int maxEvents);
+
/**
* @return information for MR AppMasters (previously failed and current)
*/
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Oct 19 20:21:32 2012
@@ -178,6 +178,7 @@ public class JobImpl implements org.apac
private int allowedMapFailuresPercent = 0;
private int allowedReduceFailuresPercent = 0;
private List<TaskAttemptCompletionEvent> taskAttemptCompletionEvents;
+ private List<TaskAttemptCompletionEvent> mapAttemptCompletionEvents;
private final List<String> diagnostics = new ArrayList<String>();
//task/attempt related datastructures
@@ -520,14 +521,28 @@ public class JobImpl implements org.apac
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
+ return getAttemptCompletionEvents(taskAttemptCompletionEvents,
+ fromEventId, maxEvents);
+ }
+
+ @Override
+ public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ return getAttemptCompletionEvents(mapAttemptCompletionEvents,
+ startIndex, maxEvents);
+ }
+
+ private TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+ List<TaskAttemptCompletionEvent> eventList,
+ int startIndex, int maxEvents) {
TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
readLock.lock();
try {
- if (taskAttemptCompletionEvents.size() > fromEventId) {
+ if (eventList.size() > startIndex) {
int actualMax = Math.min(maxEvents,
- (taskAttemptCompletionEvents.size() - fromEventId));
- events = taskAttemptCompletionEvents.subList(fromEventId,
- actualMax + fromEventId).toArray(events);
+ (eventList.size() - startIndex));
+ events = eventList.subList(startIndex,
+ actualMax + startIndex).toArray(events);
}
return events;
} finally {
@@ -1023,6 +1038,8 @@ public class JobImpl implements org.apac
job.taskAttemptCompletionEvents =
new ArrayList<TaskAttemptCompletionEvent>(
job.numMapTasks + job.numReduceTasks + 10);
+ job.mapAttemptCompletionEvents =
+ new ArrayList<TaskAttemptCompletionEvent>(job.numMapTasks + 10);
job.allowedMapFailuresPercent =
job.conf.getInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);
@@ -1294,6 +1311,9 @@ public class JobImpl implements org.apac
//eventId is equal to index in the arraylist
tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
+ if (TaskType.MAP.equals(tce.getAttemptId().getTaskId().getTaskType())) {
+ job.mapAttemptCompletionEvents.add(tce);
+ }
//make the previous completion event as obsolete if it exists
Object successEventNo =
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Oct 19 20:21:32 2012
@@ -17,20 +17,33 @@
*/
package org.apache.hadoop.mapred;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
public class TestTaskAttemptListenerImpl {
@@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl
listener.stop();
}
+
+ @Test
+ public void testGetMapCompletionEvents() throws IOException {
+ TaskAttemptCompletionEvent[] empty = {};
+ TaskAttemptCompletionEvent[] taskEvents = {
+ createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
+ createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
+ createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
+ createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
+ TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
+ Job mockJob = mock(Job.class);
+ when(mockJob.getTaskAttemptCompletionEvents(0, 100))
+ .thenReturn(taskEvents);
+ when(mockJob.getTaskAttemptCompletionEvents(0, 2))
+ .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
+ when(mockJob.getTaskAttemptCompletionEvents(2, 100))
+ .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
+ when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
+ when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
+ when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
+
+ AppContext appCtx = mock(AppContext.class);
+ when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+ TaskAttemptListenerImpl listener =
+ new TaskAttemptListenerImpl(appCtx, secret) {
+ @Override
+ protected void registerHeartbeatHandler(Configuration conf) {
+ taskHeartbeatHandler = hbHandler;
+ }
+ };
+ Configuration conf = new Configuration();
+ listener.init(conf);
+ listener.start();
+
+ JobID jid = new JobID("12345", 1);
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+ MapTaskCompletionEventsUpdate update =
+ listener.getMapCompletionEvents(jid, 0, 100, tid);
+ assertEquals(2, update.events.length);
+ update = listener.getMapCompletionEvents(jid, 0, 2, tid);
+ assertEquals(2, update.events.length);
+ update = listener.getMapCompletionEvents(jid, 2, 100, tid);
+ assertEquals(0, update.events.length);
+ }
+
+ private static TaskAttemptCompletionEvent createTce(int eventId,
+ boolean isMap, TaskAttemptCompletionEventStatus status) {
+ JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+ TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+ isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
+ : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ TaskAttemptCompletionEvent tce = recordFactory
+ .newRecordInstance(TaskAttemptCompletionEvent.class);
+ tce.setEventId(eventId);
+ tce.setAttemptId(attemptId);
+ tce.setStatus(status);
+ return tce;
+ }
+
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Oct 19 20:21:32 2012
@@ -550,6 +550,12 @@ public class MockJobs extends MockApps {
}
@Override
+ public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ return null;
+ }
+
+ @Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Fri Oct 19 20:21:32 2012
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.Arrays;
import java.util.Iterator;
-import junit.framework.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
import org.junit.Test;
public class TestFetchFailure {
@@ -144,6 +143,15 @@ public class TestFetchFailure {
TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
Assert.assertEquals("Event status not correct for reduce attempt1",
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+ TaskAttemptCompletionEvent mapEvents[] =
+ job.getMapAttemptCompletionEvents(0, 2);
+ Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+ Assert.assertArrayEquals("Unexpected map events",
+ Arrays.copyOfRange(events, 0, 2), mapEvents);
+ mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+ Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+ Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
}
/**
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Fri Oct 19 20:21:32 2012
@@ -440,6 +440,12 @@ public class TestRuntimeEstimators {
}
@Override
+ public TaskAttemptCompletionEvent[]
+ getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
public String getName() {
throw new UnsupportedOperationException("Not supported yet.");
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Oct 19 20:21:32 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.h
import java.io.IOException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -81,6 +82,7 @@ public class CompletedJob implements org
private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
private List<TaskAttemptCompletionEvent> completionEvents = null;
+ private List<TaskAttemptCompletionEvent> mapCompletionEvents = null;
private JobACLsManager aclsMgr;
@@ -176,11 +178,28 @@ public class CompletedJob implements org
if (completionEvents == null) {
constructTaskAttemptCompletionEvents();
}
+ return getAttemptCompletionEvents(completionEvents,
+ fromEventId, maxEvents);
+ }
+
+ @Override
+ public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ if (mapCompletionEvents == null) {
+ constructTaskAttemptCompletionEvents();
+ }
+ return getAttemptCompletionEvents(mapCompletionEvents,
+ startIndex, maxEvents);
+ }
+
+ private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+ List<TaskAttemptCompletionEvent> eventList,
+ int startIndex, int maxEvents) {
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
- if (completionEvents.size() > fromEventId) {
+ if (eventList.size() > startIndex) {
int actualMax = Math.min(maxEvents,
- (completionEvents.size() - fromEventId));
- events = completionEvents.subList(fromEventId, actualMax + fromEventId)
+ (eventList.size() - startIndex));
+ events = eventList.subList(startIndex, actualMax + startIndex)
.toArray(events);
}
return events;
@@ -190,11 +209,15 @@ public class CompletedJob implements org
loadAllTasks();
completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
+ int numMapAttempts = 0;
for (TaskId taskId : tasks.keySet()) {
Task task = tasks.get(taskId);
for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
allTaskAttempts.add(taskAttempt);
+ if (task.getType() == TaskType.MAP) {
+ ++numMapAttempts;
+ }
}
}
Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
@@ -223,6 +246,8 @@ public class CompletedJob implements org
}
});
+ mapCompletionEvents =
+ new ArrayList<TaskAttemptCompletionEvent>(numMapAttempts);
int eventId = 0;
for (TaskAttempt taskAttempt : allTaskAttempts) {
@@ -253,6 +278,9 @@ public class CompletedJob implements org
.getAssignedContainerMgrAddress());
tace.setStatus(taceStatus);
completionEvents.add(tace);
+ if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) {
+ mapCompletionEvents.add(tace);
+ }
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Fri Oct 19 20:21:32 2012
@@ -154,6 +154,12 @@ public class PartialJob implements org.a
}
@Override
+ public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ return null;
+ }
+
+ @Override
public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
return true;
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1400267&r1=1400266&r2=1400267&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Fri Oct 19 20:21:32 2012
@@ -126,6 +126,12 @@ public class MockHistoryJobs extends Moc
}
@Override
+ public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+ int startIndex, int maxEvents) {
+ return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
+ }
+
+ @Override
public Map<TaskId, Task> getTasks() {
return job.getTasks();
}