You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/29 12:21:18 UTC
svn commit: r700044 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/core/org/apache/hadoop/net/ConnTimeoutException.java
src/core/org/apache/hadoop/net/ReadTimeoutException.java
src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Author: ddas
Date: Mon Sep 29 03:21:18 2008
New Revision: 700044
URL: http://svn.apache.org/viewvc?rev=700044&view=rev
Log:
Merge -r 700042:700043 from trunk onto 0.19 branch. Reverts HADOOP-3327.
Removed:
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/net/ConnTimeoutException.java
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/net/ReadTimeoutException.java
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=700044&r1=700043&r2=700044&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Sep 29 03:21:18 2008
@@ -268,10 +268,6 @@
HADOOP-3756. Minor. Remove unused dfs.client.buffer.dir from
hadoop-default.xml. (rangadi)
- HADOOP-3327. Treats connection and read timeouts differently in the
- shuffle and the backoff logic is dependent on the type of timeout.
- (Jothi Padmanabhan via ddas)
-
HADOOP-3747. Adds counter suport for MultipleOutputs.
(Alejandro Abdelnur via ddas)
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=700044&r1=700043&r2=700044&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Sep 29 03:21:18 2008
@@ -74,8 +74,6 @@
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.net.ConnTimeoutException;
-import org.apache.hadoop.net.ReadTimeoutException;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
@@ -96,12 +94,6 @@
private int numMaps;
private ReduceCopier reduceCopier;
- private static enum CopyOutputErrorType {
- NO_ERROR,
- CONNECTION_ERROR,
- READ_ERROR
- }
-
private CompressionCodec codec;
@@ -110,7 +102,6 @@
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
}
-
private Progress copyPhase;
private Progress sortPhase;
private Progress reducePhase;
@@ -637,15 +628,9 @@
Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
/**
- * A map of taskId -> no. of failed fetches in connect
- */
- Map<TaskAttemptID, Integer> mapTaskToConnectFailedFetchesMap =
- new HashMap<TaskAttemptID, Integer>();
-
- /**
- * A map of taskId -> no. of failed fetches in read
+ * A map of taskId -> no. of failed fetches
*/
- Map<TaskAttemptID, Integer> mapTaskToReadFailedFetchesMap =
+ Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap =
new HashMap<TaskAttemptID, Integer>();
/**
@@ -665,7 +650,6 @@
Collections.synchronizedList(new LinkedList<MapOutput>());
-
/**
* This class contains the methods that should be used for metrics-reporting
* the specific metrics for shuffle. This class actually reports the
@@ -728,8 +712,7 @@
/** Represents the result of an attempt to copy a map output */
private class CopyResult {
-
- // the map output location against which a copy attempt was made
+ // the map output location against which a copy attempt was made
private final MapOutputLocation loc;
// the size of the file copied, -1 if the transfer failed
@@ -737,14 +720,10 @@
//a flag signifying whether a copy result is obsolete
private static final int OBSOLETE = -2;
-
- CopyOutputErrorType errorType;
-
- CopyResult(MapOutputLocation loc, long size,
- CopyOutputErrorType errorType) {
+
+ CopyResult(MapOutputLocation loc, long size) {
this.loc = loc;
this.size = size;
- this.errorType = errorType;
}
public boolean getSuccess() { return size >= 0; }
@@ -753,9 +732,6 @@
}
public long getSize() { return size; }
public String getHost() { return loc.getHost(); }
- public CopyOutputErrorType getErrorType() {
- return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR);
- }
public MapOutputLocation getLocation() { return loc; }
}
@@ -1026,6 +1002,19 @@
}
/**
+ * Fail the current file that we are fetching
+ * @return were we currently fetching?
+ */
+ public synchronized boolean fail() {
+ if (currentLocation != null) {
+ finish(-1);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
* Get the current map output location.
*/
public synchronized MapOutputLocation getLocation() {
@@ -1036,12 +1025,11 @@
currentLocation = loc;
}
- private synchronized void finish(long size,
- CopyOutputErrorType errorType) {
+ private synchronized void finish(long size) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
synchronized (copyResults) {
- copyResults.add(new CopyResult(currentLocation, size, errorType));
+ copyResults.add(new CopyResult(currentLocation, size));
copyResults.notify();
}
currentLocation = null;
@@ -1057,7 +1045,6 @@
try {
MapOutputLocation loc = null;
long size = -1;
- CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR;
synchronized (scheduledCopies) {
while (scheduledCopies.isEmpty()) {
@@ -1079,18 +1066,9 @@
// Reset
size = -1;
-
- // Identify the error type
- if (e.getClass() == ConnTimeoutException.class) {
- errorType = CopyOutputErrorType.CONNECTION_ERROR;
- }
- else if (e.getClass() == ReadTimeoutException.class) {
- errorType = CopyOutputErrorType.READ_ERROR;
- }
-
} finally {
shuffleClientMetrics.threadFree();
- finish(size, errorType);
+ finish(size);
}
} catch (InterruptedException e) {
return; // ALL DONE
@@ -1276,17 +1254,26 @@
connection.setReadTimeout(readTimeout);
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
+ while (true) {
+ try {
+ return connection.getInputStream();
+ } catch (IOException ioe) {
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
- try {
- connection.connect();
- } catch (IOException ioe) {
- throw new ConnTimeoutException("Connection Timed out");
- }
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout == 0) {
+ throw ioe;
+ }
- try {
- return connection.getInputStream();
- } catch (IOException ioe) {
- throw new ReadTimeoutException("Read Timed out");
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
+ }
}
}
@@ -1857,61 +1844,33 @@
cr.getHost());
} else {
retryFetches.add(cr.getLocation());
-
- CopyOutputErrorType errorType = cr.getErrorType();
// note the failed-fetch
TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
TaskID mapId = cr.getLocation().getTaskId();
totalFailures++;
-
- Integer noFailedFetches = 0;
-
- Integer noReadFailedFetches =
- mapTaskToReadFailedFetchesMap.get(mapTaskId);
-
- if (noReadFailedFetches == null) noReadFailedFetches = 0;
-
- Integer noConnectFailedFetches =
- mapTaskToConnectFailedFetchesMap.get(mapTaskId);
-
- if (noConnectFailedFetches == null) noConnectFailedFetches = 0;
-
- if (errorType == CopyOutputErrorType.READ_ERROR) {
- noReadFailedFetches ++;
- mapTaskToReadFailedFetchesMap.put (mapTaskId,
- noReadFailedFetches);
+ Integer noFailedFetches =
+ mapTaskToFailedFetchesMap.get(mapTaskId);
+ noFailedFetches =
+ (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
+ mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
+ LOG.info("Task " + getTaskID() + ": Failed fetch #" +
+ noFailedFetches + " from " + mapTaskId);
+
+ // did the fetch fail too many times?
+ // using a hybrid technique for notifying the jobtracker.
+ // a. the first notification is sent after max-retries
+ // b. subsequent notifications are sent after 2 retries.
+ if ((noFailedFetches >= maxFetchRetriesPerMap)
+ && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
LOG.info("Failed to fetch map-output from " + mapTaskId +
- " Got a Read Time out," +
- " reporting to the JobTracker");
- }
- } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
- noConnectFailedFetches ++;
- mapTaskToConnectFailedFetchesMap.put (
- mapTaskId, noConnectFailedFetches);
-
- LOG.info("Task " + getTaskID() + ": Failed fetch #"
- + noConnectFailedFetches + " from " + mapTaskId);
-
- if ((noConnectFailedFetches >= maxFetchRetriesPerMap) &&
- ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2)
- == 0) {
- synchronized (ReduceTask.this) {
- taskStatus.addFetchFailedMap(mapTaskId);
- LOG.info("Failed to fetch map-output from " + mapTaskId
- + " even after MAX_FETCH_RETRIES_PER_MAP"
- + " (connect) retries... "
- + " reporting to the JobTracker");
- }
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ + " reporting to the JobTracker");
}
}
-
- noFailedFetches = noConnectFailedFetches +
- noReadFailedFetches;
-
// note unique failed-fetch maps
if (noFailedFetches == maxFetchRetriesPerMap) {
fetchFailedMaps.add(mapId);
@@ -1960,32 +1919,22 @@
}
}
- if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
- // back off exponentially until num_retries <= max_retries
- // back off by max_backoff/2 on subsequent failed attempts
- currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
+ // back off exponentially until num_retries <= max_retries
+ // back off by max_backoff/2 on subsequent failed attempts
+ currentTime = System.currentTimeMillis();
+ int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
- penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
- LOG.warn(reduceTask.getTaskID() + " adding host " +
+ penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +
(currentBackOff/1000) + " seconds");
- } else if (errorType == CopyOutputErrorType.READ_ERROR) {
- int backOff = Math.max(maxMapRuntime/2,
- (this.maxBackoff * 1000));
- penaltyBox.put(cr.getHost(), currentTime + backOff);
- LOG.warn(reduceTask.getTaskID() + " adding host " +
- cr.getHost() + " to penalty box, next contact in " +
- (backOff/1000) + " seconds");
- }
-
- } // Fetch Failure
+ }
uniqueHosts.remove(cr.getHost());
numInFlight--;
- } // while (numInFlight > 0)
- } // while (copiedMaps < numMaps)
+ }
+ }
// all done, inform the copiers to exit
synchronized (copiers) {