You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/12 08:56:37 UTC
svn commit: r725939 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Author: ddas
Date: Thu Dec 11 23:56:37 2008
New Revision: 725939
URL: http://svn.apache.org/viewvc?rev=725939&view=rev
Log:
HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents in a separate thread. Contributed by Jothi Padmanabhan.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725939&r1=725938&r2=725939&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 11 23:56:37 2008
@@ -233,6 +233,9 @@
based on the rack/host that has the most number of bytes.
(Jothi Padmanabhan via ddas)
+ HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents
+ in a separate thread. (Jothi Padmanabhan via ddas)
+
BUG FIXES
HADOOP-4204. Fix findbugs warnings related to unused variables, naive
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=725939&r1=725938&r2=725939&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Dec 11 23:56:37 2008
@@ -44,6 +44,7 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -514,11 +515,6 @@
private Set<String> uniqueHosts;
/**
- * the last time we polled the job tracker
- */
- private long lastPollTime;
-
- /**
* A reference to the RamManager for writing the map outputs to.
*/
@@ -544,6 +540,11 @@
* A flag to indicate when to exit localFS merge
*/
private volatile boolean exitLocalFSMerge = false;
+
+ /**
+ * A flag to indicate when to exit getMapEvents thread
+ */
+ private volatile boolean exitGetMapEvents = false;
/**
* When we accumulate maxInMemOutputs number of files in ram, we merge/spill
@@ -604,7 +605,7 @@
/**
* Maximum number of fetch-retries per-map.
*/
- private int maxFetchRetriesPerMap;
+ private volatile int maxFetchRetriesPerMap;
/**
* Combiner class to run during in-memory merge, if defined.
@@ -671,7 +672,13 @@
private final List<MapOutput> mapOutputsFilesInMemory =
Collections.synchronizedList(new LinkedList<MapOutput>());
-
+ /**
+ * The map for (Hosts, List of MapIds from this Host) maintaining
+ * map output locations
+ */
+ private final Map<String, List<MapOutputLocation>> mapLocations =
+ new ConcurrentHashMap<String, List<MapOutputLocation>>();
+
/**
* This class contains the methods that should be used for metrics-reporting
* the specific metrics for shuffle. This class actually reports the
@@ -1611,9 +1618,6 @@
// hostnames
this.uniqueHosts = new HashSet<String>();
- this.lastPollTime = 0;
-
-
// Seed the random number generator with a reasonably globally unique seed
long randomSeed = System.nanoTime() +
(long)Math.pow(this.reduceTask.getPartition(),
@@ -1630,9 +1634,6 @@
long reducerInputBytes = 0;
public boolean fetchOutputs() throws IOException {
- //The map for (Hosts, List of MapIds from this Host)
- HashMap<String, List<MapOutputLocation>> mapLocations =
- new HashMap<String, List<MapOutputLocation>>();
int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
long bytesTransferred = 0;
@@ -1641,6 +1642,7 @@
reduceTask.getProgress().phase();
LocalFSMerger localFSMergerThread = null;
InMemFSMergeThread inMemFSMergeThread = null;
+ GetMapEventsThread getMapEventsThread = null;
for (int i = 0; i < numMaps; i++) {
copyPhase.addPhase(); // add sub-phase per file
@@ -1664,15 +1666,15 @@
localFSMergerThread.start();
inMemFSMergeThread.start();
+ // start the map events thread
+ getMapEventsThread = new GetMapEventsThread();
+ getMapEventsThread.start();
+
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
long lastProgressTime = startTime;
long lastOutputTime = 0;
- IntWritable fromEventId = new IntWritable(0);
-
- //List of unique hosts containing map outputs
- List<String> hostList = new ArrayList<String>();
// loop until we get all required outputs
while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
@@ -1688,73 +1690,43 @@
+ (numMaps - copiedMapOutputs.size()) + " map output(s) "
+ "where " + numInFlight + " is already in progress");
}
-
- try {
- // Put the hash entries for the failed fetches.
- Iterator<MapOutputLocation> locItr = retryFetches.iterator();
- while (locItr.hasNext()) {
- MapOutputLocation loc = locItr.next();
- List<MapOutputLocation> locList =
- mapLocations.get(loc.getHost());
- if (locList == null) {
- locList = new LinkedList<MapOutputLocation>();
- mapLocations.put(loc.getHost(), locList);
- hostList.add(loc.getHost());
- }
- //Add to the beginning of the list so that this map is
- //tried again before the others and we can hasten the
- //re-execution of this map should there be a problem
- locList.add(0, loc);
- }
-
- // The call getMapCompletionEvents will update fromEventId to
- // used for the next call to getMapCompletionEvents
-
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
-
- int numNewOutputs = getMapCompletionEvents(fromEventId,
- mapLocations,
- hostList);
- if (numNewOutputs > 0 || logNow) {
- LOG.info(reduceTask.getTaskID() + ": " +
- "Got " + numNewOutputs +
- " new map-outputs");
- }
-
- int numNewObsoleteMaps = obsoleteMapIds.size()-currentNumObsoleteMapIds;
+ // Put the hash entries for the failed fetches.
+ Iterator<MapOutputLocation> locItr = retryFetches.iterator();
- if (numNewObsoleteMaps > 0) {
- LOG.info(reduceTask.getTaskID() + ": " +
- "Got " + numNewObsoleteMaps +
- " obsolete map-outputs from tasktracker ");
- }
-
- if (retryFetches.size() > 0) {
- LOG.info(reduceTask.getTaskID() + ": " +
+ while (locItr.hasNext()) {
+ MapOutputLocation loc = locItr.next();
+ List<MapOutputLocation> locList =
+ mapLocations.get(loc.getHost());
+ //Add to the beginning of the list so that this map is
+ //tried again before the others and we can hasten the
+ //re-execution of this map should there be a problem
+ locList.add(0, loc);
+ }
+
+ if (retryFetches.size() > 0) {
+ LOG.info(reduceTask.getTaskID() + ": " +
"Got " + retryFetches.size() +
" map-outputs from previous failures");
- }
- // clear the "failed" fetches hashmap
- retryFetches.clear();
- }
- catch (IOException ie) {
- LOG.warn(reduceTask.getTaskID() +
- " Problem locating map outputs: " +
- StringUtils.stringifyException(ie));
}
-
+ // clear the "failed" fetches hashmap
+ retryFetches.clear();
+
// now walk through the cache and schedule what we can
int numScheduled = 0;
int numDups = 0;
synchronized (scheduledCopies) {
-
+
// Randomize the map output locations to prevent
// all reduce-tasks swamping the same tasktracker
- Collections.shuffle(hostList, this.random);
+ List<String> hostList = new ArrayList<String>();
+ hostList.addAll(mapLocations.keySet());
+ Collections.shuffle(hostList, this.random);
+
Iterator<String> hostsItr = hostList.iterator();
+
while (hostsItr.hasNext()) {
String host = hostsItr.next();
@@ -1762,9 +1734,13 @@
List<MapOutputLocation> knownOutputsByLoc =
mapLocations.get(host);
+ if (knownOutputsByLoc.size() == 0) {
+ continue;
+ }
+
//Identify duplicate hosts here
if (uniqueHosts.contains(host)) {
- numDups += knownOutputsByLoc.size() -1;
+ numDups += knownOutputsByLoc.size();
continue;
}
@@ -1782,34 +1758,32 @@
if (penalized)
continue;
- Iterator<MapOutputLocation> locItr =
- knownOutputsByLoc.iterator();
+ synchronized (knownOutputsByLoc) {
+
+ locItr = knownOutputsByLoc.iterator();
- while (locItr.hasNext()) {
+ while (locItr.hasNext()) {
- MapOutputLocation loc = locItr.next();
+ MapOutputLocation loc = locItr.next();
- // Do not schedule fetches from OBSOLETE maps
- if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
- locItr.remove();
- continue;
- }
-
- uniqueHosts.add(host);
- scheduledCopies.add(loc);
- locItr.remove(); // remove from knownOutputs
- numInFlight++; numScheduled++;
+ // Do not schedule fetches from OBSOLETE maps
+ if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
+ locItr.remove();
+ continue;
+ }
+
+ uniqueHosts.add(host);
+ scheduledCopies.add(loc);
+ locItr.remove(); // remove from knownOutputs
+ numInFlight++; numScheduled++;
- break; //we have a map from this host
- }
-
- if (knownOutputsByLoc.size() == 0) {
- mapLocations.remove(host);
- hostsItr.remove();
+ break; //we have a map from this host
+ }
}
}
scheduledCopies.notifyAll();
}
+
if (numScheduled > 0 || logNow) {
LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
" outputs (" + penaltyBox.size() +
@@ -1823,7 +1797,7 @@
((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
}
}
-
+
// if we have no copies in flight and we can't schedule anything
// new, just wait for a bit
try {
@@ -1869,7 +1843,7 @@
+ " at " +
mbpsFormat.format(transferRate) + " MB/s)");
- // Note successfull fetch for this mapId to invalidate
+ // Note successful fetch for this mapId to invalidate
// (possibly) old fetch-failures
fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
@@ -1974,6 +1948,15 @@
}
// all done, inform the copiers to exit
+ exitGetMapEvents= true;
+ try {
+ getMapEventsThread.join();
+ LOG.info("getMapsEventsThread joined.");
+ } catch (Throwable t) {
+ LOG.info("getMapsEventsThread threw an exception: " +
+ StringUtils.stringifyException(t));
+ }
+
synchronized (copiers) {
synchronized (scheduledCopies) {
for (MapOutputCopier copier : copiers) {
@@ -2218,109 +2201,6 @@
}
}
- /**
- * Queries the {@link TaskTracker} for a set of map-completion events from
- * a given event ID.
- *
- * @param fromEventId the first event ID we want to start from, this is
- * modified by the call to this method
- * @param mapLocations the hash map of map locations by host
- * @param hostsList the list that contains unique hosts having
- * map outputs, will be updated on the return
- * of this method
- * @return the number of new map-completion events from the given event ID
- * @throws IOException
- */
- private int getMapCompletionEvents(IntWritable fromEventId,
- HashMap<String,List<MapOutputLocation>> mapLocations,
- List<String> hostsList)
- throws IOException {
-
- long currentTime = System.currentTimeMillis();
- long pollTime = lastPollTime + MIN_POLL_INTERVAL;
- int numNewMaps = 0;
- while (currentTime < pollTime) {
- try {
- Thread.sleep(pollTime-currentTime);
- } catch (InterruptedException ie) { } // IGNORE
- currentTime = System.currentTimeMillis();
- }
-
- MapTaskCompletionEventsUpdate update =
- umbilical.getMapCompletionEvents(reduceTask.getJobID(),
- fromEventId.get(), MAX_EVENTS_TO_FETCH,
- reduceTask.getTaskID());
- TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
-
- // Check if the reset is required.
- // Since there is no ordering of the task completion events at the
- // reducer, the only option to sync with the new jobtracker is to reset
- // the events index
- if (update.shouldReset()) {
- fromEventId.set(0);
- obsoleteMapIds.clear(); // clear the obsolete map
- }
-
- // Note the last successful poll time-stamp
- lastPollTime = currentTime;
-
- // Update the last seen event ID
- fromEventId.set(fromEventId.get() + events.length);
-
- // Process the TaskCompletionEvents:
- // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
- // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop fetching
- // from those maps.
- // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
- // outputs at all.
- for (TaskCompletionEvent event : events) {
- switch (event.getTaskStatus()) {
- case SUCCEEDED:
- {
- URI u = URI.create(event.getTaskTrackerHttp());
- String host = u.getHost();
- TaskAttemptID taskId = event.getTaskAttemptId();
- int duration = event.getTaskRunTime();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- // adjust max-fetch-retries based on max-map-run-time
- maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
- getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
- }
- URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
- "/mapOutput?job=" + taskId.getJobID() +
- "&map=" + taskId +
- "&reduce=" + getPartition());
- List<MapOutputLocation> loc = mapLocations.get(host);
- if (loc == null) {
- loc = new LinkedList<MapOutputLocation>();
- mapLocations.put(host, loc);
- hostsList.add(host);
- }
- loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
- numNewMaps ++;
- }
- break;
- case FAILED:
- case KILLED:
- case OBSOLETE:
- {
- obsoleteMapIds.add(event.getTaskAttemptId());
- LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
- " map-task: '" + event.getTaskAttemptId() + "'");
- }
- break;
- case TIPFAILED:
- {
- copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
- LOG.info("Ignoring output of failed map TIP: '" +
- event.getTaskAttemptId() + "'");
- }
- break;
- }
- }
- return numNewMaps;
- }
/** Starts merging the local copy (on disk) of the map's output so that
@@ -2552,6 +2432,130 @@
}
}
+ private class GetMapEventsThread extends Thread {
+
+ private IntWritable fromEventId = new IntWritable(0);
+ private static final long SLEEP_TIME = 1000;
+
+ public GetMapEventsThread() {
+ setName("Thread for polling Map Completion Events");
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+
+ LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
+
+ do {
+ try {
+ int numNewMaps = getMapCompletionEvents();
+ if (numNewMaps > 0) {
+ LOG.info(reduceTask.getTaskID() + ": " +
+ "Got " + numNewMaps + " new map-outputs");
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ catch (InterruptedException e) {
+ LOG.warn(reduceTask.getTaskID() +
+ " GetMapEventsThread returning after an " +
+ " interrupted exception");
+ return;
+ }
+ catch (Throwable t) {
+ LOG.warn(reduceTask.getTaskID() +
+ " GetMapEventsThread Ignoring exception : " +
+ StringUtils.stringifyException(t));
+ }
+ } while (!exitGetMapEvents);
+
+ LOG.info("GetMapEventsThread exiting");
+
+ }
+
+ /**
+ * Queries the {@link TaskTracker} for a set of map-completion events
+ * from a given event ID.
+ * @throws IOException
+ */
+ private int getMapCompletionEvents() throws IOException {
+
+ int numNewMaps = 0;
+
+ MapTaskCompletionEventsUpdate update =
+ umbilical.getMapCompletionEvents(reduceTask.getJobID(),
+ fromEventId.get(),
+ MAX_EVENTS_TO_FETCH,
+ reduceTask.getTaskID());
+ TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
+
+ // Check if the reset is required.
+ // Since there is no ordering of the task completion events at the
+ // reducer, the only option to sync with the new jobtracker is to reset
+ // the events index
+ if (update.shouldReset()) {
+ fromEventId.set(0);
+ obsoleteMapIds.clear(); // clear the obsolete map
+ }
+
+ // Update the last seen event ID
+ fromEventId.set(fromEventId.get() + events.length);
+
+ // Process the TaskCompletionEvents:
+ // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
+ // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
+ // fetching from those maps.
+ // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
+ // outputs at all.
+ for (TaskCompletionEvent event : events) {
+ switch (event.getTaskStatus()) {
+ case SUCCEEDED:
+ {
+ URI u = URI.create(event.getTaskTrackerHttp());
+ String host = u.getHost();
+ TaskAttemptID taskId = event.getTaskAttemptId();
+ int duration = event.getTaskRunTime();
+ if (duration > maxMapRuntime) {
+ maxMapRuntime = duration;
+ // adjust max-fetch-retries based on max-map-run-time
+ maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP,
+ getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
+ }
+ URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
+ "/mapOutput?job=" + taskId.getJobID() +
+ "&map=" + taskId +
+ "&reduce=" + getPartition());
+ List<MapOutputLocation> loc = mapLocations.get(host);
+ if (loc == null) {
+ loc = Collections.synchronizedList
+ (new LinkedList<MapOutputLocation>());
+ mapLocations.put(host, loc);
+ }
+ loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
+ numNewMaps ++;
+ }
+ break;
+ case FAILED:
+ case KILLED:
+ case OBSOLETE:
+ {
+ obsoleteMapIds.add(event.getTaskAttemptId());
+ LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
+ " map-task: '" + event.getTaskAttemptId() + "'");
+ }
+ break;
+ case TIPFAILED:
+ {
+ copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
+ LOG.info("Ignoring output of failed map TIP: '" +
+ event.getTaskAttemptId() + "'");
+ }
+ break;
+ }
+ }
+ return numNewMaps;
+ }
+ }
}
/**