You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:37:37 UTC
svn commit: r1077072 - in
/hadoop/common/branches/branch-0.20-security-patches/src/mapred:
mapred-default.xml org/apache/hadoop/mapred/ReduceTask.java
Author: omalley
Date: Fri Mar 4 03:37:37 2011
New Revision: 1077072
URL: http://svn.apache.org/viewvc?rev=1077072&view=rev
Log:
commit 0d5da097f9483c784e5c88d13f13b91aa9eba625
Author: Arun C Murthy <ac...@apache.org>
Date: Fri Dec 11 09:13:53 2009 +0530
MAPREDUCE-353. Allow shuffle read and connection timeouts to be configurable. Contributed by Amareshwari Sriramadasu.
From https://issues.apache.org/jira/secure/attachment/12427566/patch-353-ydist.txt
+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-353. Allow shuffle read and connection timeouts to be
+ configurable. (Amareshwari Sriramadasu via acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077072&r1=1077071&r2=1077072&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:37:37 2011
@@ -297,6 +297,23 @@
</property>
<property>
+ <name>mapreduce.reduce.shuffle.connect.timeout</name>
+ <value>180000</value>
+ <description>Expert: The maximum amount of time (in milli seconds) a reduce
+ task spends in trying to connect to a tasktracker for getting map output.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.reduce.shuffle.read.timeout</name>
+ <value>180000</value>
+ <description>Expert: The maximum amount of time (in milli seconds) a reduce
+ task waits for map output data to be available for reading after obtaining
+ connection.
+ </description>
+</property>
+
+<property>
<name>mapred.task.timeout</name>
<value>600000</value>
<description>The number of milliseconds before a task will be
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077072&r1=1077071&r2=1077072&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 03:37:37 2011
@@ -1134,6 +1134,8 @@ class ReduceTask extends Task {
private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
// default read timeout (in milliseconds)
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+ private final int shuffleConnectionTimeout;
+ private final int shuffleReadTimeout;
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
@@ -1149,6 +1151,11 @@ class ReduceTask extends Task {
LOG.debug(getName() + " created");
this.reporter = reporter;
+ shuffleConnectionTimeout =
+ job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
+ shuffleReadTimeout =
+ job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
+
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
@@ -1370,8 +1377,8 @@ class ReduceTask extends Task {
// Connect
URLConnection connection =
mapOutputLoc.getOutputLocation().openConnection();
- InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
- DEFAULT_READ_TIMEOUT);
+ InputStream input = getInputStream(connection, shuffleConnectionTimeout,
+ shuffleReadTimeout);
// Validate header from map output
TaskAttemptID mapId = null;
@@ -1510,8 +1517,8 @@ class ReduceTask extends Task {
// Reconnect
try {
connection = mapOutputLoc.getOutputLocation().openConnection();
- input = getInputStream(connection, STALLED_COPY_TIMEOUT,
- DEFAULT_READ_TIMEOUT);
+ input = getInputStream(connection, shuffleConnectionTimeout,
+ shuffleReadTimeout);
} catch (IOException ioe) {
LOG.info("Failed reopen connection to fetch map-output from " +
mapOutputLoc.getHost());