You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/10/25 03:26:30 UTC
svn commit: r1401941 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/o...
Author: vinodkv
Date: Thu Oct 25 01:26:30 2012
New Revision: 1401941
URL: http://svn.apache.org/viewvc?rev=1401941&view=rev
Log:
MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion requests slowly to avoid HADOOP-8942. Contributed by Jason Lowe.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1401941&r1=1401940&r2=1401941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Oct 25 01:26:30 2012
@@ -610,6 +610,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
(Vinod Kumar Vavilapalli via jlowe)
+ MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
+ requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
+
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=1401941&r1=1401940&r2=1401941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Thu Oct 25 01:26:30 2012
@@ -27,10 +27,8 @@ import org.apache.hadoop.mapred.TaskComp
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-@SuppressWarnings("deprecation")
class EventFetcher<K,V> extends Thread {
private static final long SLEEP_TIME = 1000;
- private static final int MAX_EVENTS_TO_FETCH = 10000;
private static final int MAX_RETRIES = 10;
private static final int RETRY_PERIOD = 5000;
private static final Log LOG = LogFactory.getLog(EventFetcher.class);
@@ -38,7 +36,8 @@ class EventFetcher<K,V> extends Thread {
private final TaskAttemptID reduce;
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler<K,V> scheduler;
- private int fromEventId = 0;
+ private int fromEventIdx = 0;
+ private int maxEventsToFetch;
private ExceptionReporter exceptionReporter = null;
private int maxMapRuntime = 0;
@@ -48,13 +47,15 @@ class EventFetcher<K,V> extends Thread {
public EventFetcher(TaskAttemptID reduce,
TaskUmbilicalProtocol umbilical,
ShuffleScheduler<K,V> scheduler,
- ExceptionReporter reporter) {
+ ExceptionReporter reporter,
+ int maxEventsToFetch) {
setName("EventFetcher for fetching Map Completion Events");
setDaemon(true);
this.reduce = reduce;
this.umbilical = umbilical;
this.scheduler = scheduler;
exceptionReporter = reporter;
+ this.maxEventsToFetch = maxEventsToFetch;
}
@Override
@@ -112,46 +113,47 @@ class EventFetcher<K,V> extends Thread {
* from a given event ID.
* @throws IOException
*/
- private int getMapCompletionEvents() throws IOException {
+ protected int getMapCompletionEvents() throws IOException {
int numNewMaps = 0;
-
- MapTaskCompletionEventsUpdate update =
- umbilical.getMapCompletionEvents((org.apache.hadoop.mapred.JobID)
- reduce.getJobID(),
- fromEventId,
- MAX_EVENTS_TO_FETCH,
- (org.apache.hadoop.mapred.TaskAttemptID)
- reduce);
- TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
- LOG.debug("Got " + events.length + " map completion events from " +
- fromEventId);
-
- // Check if the reset is required.
- // Since there is no ordering of the task completion events at the
- // reducer, the only option to sync with the new jobtracker is to reset
- // the events index
- if (update.shouldReset()) {
- fromEventId = 0;
- scheduler.resetKnownMaps();
- }
-
- // Update the last seen event ID
- fromEventId += events.length;
-
- // Process the TaskCompletionEvents:
- // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
- // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
- // fetching from those maps.
- // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
- // outputs at all.
- for (TaskCompletionEvent event : events) {
- switch (event.getTaskStatus()) {
+ TaskCompletionEvent events[] = null;
+
+ do {
+ MapTaskCompletionEventsUpdate update =
+ umbilical.getMapCompletionEvents(
+ (org.apache.hadoop.mapred.JobID)reduce.getJobID(),
+ fromEventIdx,
+ maxEventsToFetch,
+ (org.apache.hadoop.mapred.TaskAttemptID)reduce);
+ events = update.getMapTaskCompletionEvents();
+ LOG.debug("Got " + events.length + " map completion events from " +
+ fromEventIdx);
+
+ // Check if the reset is required.
+ // Since there is no ordering of the task completion events at the
+ // reducer, the only option to sync with the new jobtracker is to reset
+ // the events index
+ if (update.shouldReset()) {
+ fromEventIdx = 0;
+ scheduler.resetKnownMaps();
+ }
+
+ // Update the last seen event ID
+ fromEventIdx += events.length;
+
+ // Process the TaskCompletionEvents:
+ // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+ // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+ // fetching from those maps.
+ // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+ // outputs at all.
+ for (TaskCompletionEvent event : events) {
+ switch (event.getTaskStatus()) {
case SUCCEEDED:
URI u = getBaseURI(event.getTaskTrackerHttp());
scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
- u.toString(),
- event.getTaskAttemptId());
+ u.toString(),
+ event.getTaskAttemptId());
numNewMaps ++;
int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) {
@@ -164,15 +166,17 @@ class EventFetcher<K,V> extends Thread {
case OBSOLETE:
scheduler.obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
- " map-task: '" + event.getTaskAttemptId() + "'");
+ " map-task: '" + event.getTaskAttemptId() + "'");
break;
case TIPFAILED:
scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
- event.getTaskAttemptId() + "'");
+ event.getTaskAttemptId() + "'");
break;
+ }
}
- }
+ } while (events.length == maxEventsToFetch);
+
return numNewMaps;
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1401941&r1=1401940&r2=1401941&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Thu Oct 25 01:26:30 2012
@@ -40,9 +40,12 @@ import org.apache.hadoop.util.Progress;
@InterfaceAudience.Private
@InterfaceStability.Unstable
-@SuppressWarnings({"deprecation", "unchecked", "rawtypes"})
+@SuppressWarnings({"unchecked", "rawtypes"})
public class Shuffle<K, V> implements ExceptionReporter {
private static final int PROGRESS_FREQUENCY = 2000;
+ private static final int MAX_EVENTS_TO_FETCH = 10000;
+ private static final int MIN_EVENTS_TO_FETCH = 100;
+ private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
private final TaskAttemptID reduceId;
private final JobConf jobConf;
@@ -99,9 +102,17 @@ public class Shuffle<K, V> implements Ex
}
public RawKeyValueIterator run() throws IOException, InterruptedException {
+ // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+ // on the ApplicationMaster when a thundering herd of reducers fetch events
+ // TODO: This should not be necessary after HADOOP-8942
+ int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
+ MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
+ int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+
// Start the map-completion events fetcher thread
final EventFetcher<K,V> eventFetcher =
- new EventFetcher<K,V>(reduceId, umbilical, scheduler, this);
+ new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,
+ maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java?rev=1401941&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java Thu Oct 25 01:26:30 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class TestEventFetcher {
+
+ @Test
+ public void testConsecutiveFetch() throws IOException {
+ final int MAX_EVENTS_TO_FETCH = 100;
+ TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
+
+ TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
+ when(umbilical.getMapCompletionEvents(any(JobID.class),
+ anyInt(), anyInt(), any(TaskAttemptID.class)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, 0));
+ when(umbilical.getMapCompletionEvents(any(JobID.class),
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
+ when(umbilical.getMapCompletionEvents(any(JobID.class),
+ eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
+ MAX_EVENTS_TO_FETCH));
+ when(umbilical.getMapCompletionEvents(any(JobID.class),
+ eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
+ .thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
+
+ @SuppressWarnings("unchecked")
+ ShuffleScheduler<String,String> scheduler = mock(ShuffleScheduler.class);
+ ExceptionReporter reporter = mock(ExceptionReporter.class);
+
+ EventFetcherForTest<String,String> ef =
+ new EventFetcherForTest<String,String>(tid, umbilical, scheduler,
+ reporter, MAX_EVENTS_TO_FETCH);
+ ef.getMapCompletionEvents();
+
+ verify(reporter, never()).reportException(any(Throwable.class));
+ InOrder inOrder = inOrder(umbilical);
+ inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+ eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+ inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+ eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+ inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
+ eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
+ verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput(
+ anyString(), anyString(), any(TaskAttemptID.class));
+ }
+
+ private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
+ int startIdx, int numEvents) {
+ ArrayList<TaskCompletionEvent> tceList =
+ new ArrayList<TaskCompletionEvent>(numEvents);
+ for (int i = 0; i < numEvents; ++i) {
+ int eventIdx = startIdx + i;
+ TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
+ new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
+ eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
+ "http://somehost:8888");
+ tceList.add(tce);
+ }
+ TaskCompletionEvent[] events = {};
+ return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
+ }
+
+ private static class EventFetcherForTest<K,V> extends EventFetcher<K,V> {
+
+ public EventFetcherForTest(TaskAttemptID reduce,
+ TaskUmbilicalProtocol umbilical, ShuffleScheduler<K,V> scheduler,
+ ExceptionReporter reporter, int maxEventsToFetch) {
+ super(reduce, umbilical, scheduler, reporter, maxEventsToFetch);
+ }
+
+ @Override
+ public int getMapCompletionEvents() throws IOException {
+ return super.getMapCompletionEvents();
+ }
+
+ }
+}