You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/01/03 13:15:55 UTC
svn commit: r608462 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: ddas
Date: Thu Jan 3 04:15:53 2008
New Revision: 608462
URL: http://svn.apache.org/viewvc?rev=608462&view=rev
Log:
HADOOP-1719. Improves the utilization of shuffle copier threads. Contributed by Amar Kamat.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=608462&r1=608461&r2=608462&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Jan 3 04:15:53 2008
@@ -144,6 +144,9 @@
performance. Make NullWritable implement Comparable. Make TextOutputFormat
treat NullWritable like null. (omalley)
+ HADOOP-1719. Improves the utilization of shuffle copier threads.
+ (Amar Kamat via ddas)
+
OPTIMIZATIONS
HADOOP-1898. Release the lock protecting the last time of the last stack
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=608462&r1=608461&r2=608462&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jan 3 04:15:53 2008
@@ -959,21 +959,30 @@
// MapOutputLocations as values
knownOutputs.addAll(retryFetches);
- // The call getMapCompletionEvents will update fromEventId to
- // used for the next call to getMapCompletionEvents
- int currentNumKnownMaps = knownOutputs.size();
- int currentNumObsoleteMapIds = obsoleteMapIds.size();
- getMapCompletionEvents(fromEventId, knownOutputs);
+ // ensure we have enough to keep us busy
+ boolean busy = isBusy(numInFlight, numCopiers, lowThreshold,
+ uniqueHosts.size(), probe_sample_size,
+ numOutputs - numCopied);
+ if (!busy) {
+ // The call getMapCompletionEvents will update fromEventId to
+ // used for the next call to getMapCompletionEvents
+ int currentNumKnownMaps = knownOutputs.size();
+ int currentNumObsoleteMapIds = obsoleteMapIds.size();
+ getMapCompletionEvents(fromEventId, knownOutputs);
- LOG.info(reduceTask.getTaskId() + ": " +
+ LOG.info(reduceTask.getTaskId() + ": " +
"Got " + (knownOutputs.size()-currentNumKnownMaps) +
" new map-outputs & " +
(obsoleteMapIds.size()-currentNumObsoleteMapIds) +
" obsolete map-outputs from tasktracker and " +
retryFetches.size() + " map-outputs from previous failures"
);
-
+ } else {
+ LOG.info(" Busy enough - did not query the tasktracker for "
+ + "new map outputs. Have "+ retryFetches.size()
+ + " map outputs from previous failures");
+ }
// clear the "failed" fetches hashmap
retryFetches.clear();
}
@@ -1181,16 +1190,10 @@
numInFlight--;
}
- boolean busy = true;
- // ensure we have enough to keep us busy
- if (numInFlight < lowThreshold && (numOutputs-numCopied) >
- probe_sample_size) {
- busy = false;
- }
//Check whether we have more CopyResult to check. If there is none,
- //and we are not busy enough, break
+ //break
synchronized (copyResults) {
- if (copyResults.size() == 0 && !busy) {
+ if (copyResults.size() == 0) {
break;
}
}
@@ -1276,12 +1279,30 @@
}
}
+ /** Added a check for whether #uniqueHosts < #copiers, and if so conclude
+ * we are not busy enough. The logic is that we fetch only one map output
+ * at a time from any given host and uniqueHosts keep a track of that.
+ * As soon as we add a host to uniqueHosts, a 'copy' from that is
+ * scheduled as well. Thus, when the size of uniqueHosts is >= numCopiers,
+ * it means that all copiers are busy. Although the converse is not true
+ * (e.g. in the case where we have more copiers than the number of hosts
+ * in the cluster), but it should generally be useful to do this check.
+ **/
+ private boolean isBusy(int numInFlight, int numCopiers, int lowThreshold,
+ int uniqueHostsSize, int probeSampleSize,
+ int remainCopy) {
+ if ((numInFlight < lowThreshold && remainCopy > probeSampleSize) ||
+ uniqueHostsSize < numCopiers) {
+ return false;
+ }
+ return true;
+ }
private CopyResult getCopyResult() {
synchronized (copyResults) {
- while (copyResults.isEmpty()) {
+ if (copyResults.isEmpty()) {
try {
- copyResults.wait();
+ copyResults.wait(2000); // wait for 2 sec
} catch (InterruptedException e) { }
}
if (copyResults.isEmpty()) {