You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:37:37 UTC

svn commit: r1077072 - in /hadoop/common/branches/branch-0.20-security-patches/src/mapred: mapred-default.xml org/apache/hadoop/mapred/ReduceTask.java

Author: omalley
Date: Fri Mar  4 03:37:37 2011
New Revision: 1077072

URL: http://svn.apache.org/viewvc?rev=1077072&view=rev
Log:
commit 0d5da097f9483c784e5c88d13f13b91aa9eba625
Author: Arun C Murthy <ac...@apache.org>
Date:   Fri Dec 11 09:13:53 2009 +0530

    MAPREDUCE-353. Allow shuffle read and connection timeouts to be configurable. Contributed by Amareshwari Sriramadasu.
    
    From https://issues.apache.org/jira/secure/attachment/12427566/patch-353-ydist.txt
    
    +++ b/YAHOO-CHANGES.txt
    +
    +    MAPREDUCE-353. Allow shuffle read and connection timeouts to be
    +    configurable. (Amareshwari Sriramadasu via acmurthy)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077072&r1=1077071&r2=1077072&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 03:37:37 2011
@@ -297,6 +297,23 @@
 </property>
 
 <property>
+  <name>mapreduce.reduce.shuffle.connect.timeout</name>
+  <value>180000</value>
+  <description>Expert: The maximum amount of time (in milli seconds) a reduce
+  task spends in trying to connect to a tasktracker for getting map output.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.reduce.shuffle.read.timeout</name>
+  <value>180000</value>
+  <description>Expert: The maximum amount of time (in milli seconds) a reduce
+  task waits for map output data to be available for reading after obtaining
+  connection.
+  </description>
+</property>
+
+<property>
   <name>mapred.task.timeout</name>
   <value>600000</value>
   <description>The number of milliseconds before a task will be

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077072&r1=1077071&r2=1077072&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:37:37 2011
@@ -1134,6 +1134,8 @@ class ReduceTask extends Task {
       private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
       // default read timeout (in milliseconds)
       private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+      private final int shuffleConnectionTimeout;
+      private final int shuffleReadTimeout;
 
       private MapOutputLocation currentLocation = null;
       private int id = nextMapOutputCopierId++;
@@ -1149,6 +1151,11 @@ class ReduceTask extends Task {
         LOG.debug(getName() + " created");
         this.reporter = reporter;
         
+        shuffleConnectionTimeout =
+          job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
+        shuffleReadTimeout =
+          job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);
+        
         if (job.getCompressMapOutput()) {
           Class<? extends CompressionCodec> codecClass =
             job.getMapOutputCompressorClass(DefaultCodec.class);
@@ -1370,8 +1377,8 @@ class ReduceTask extends Task {
         // Connect
         URLConnection connection = 
           mapOutputLoc.getOutputLocation().openConnection();
-        InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
-                                           DEFAULT_READ_TIMEOUT); 
+        InputStream input = getInputStream(connection, shuffleConnectionTimeout,
+                                           shuffleReadTimeout); 
         
         // Validate header from map output
         TaskAttemptID mapId = null;
@@ -1510,8 +1517,8 @@ class ReduceTask extends Task {
           // Reconnect
           try {
             connection = mapOutputLoc.getOutputLocation().openConnection();
-            input = getInputStream(connection, STALLED_COPY_TIMEOUT, 
-                                   DEFAULT_READ_TIMEOUT);
+            input = getInputStream(connection, shuffleConnectionTimeout, 
+                                   shuffleReadTimeout);
           } catch (IOException ioe) {
             LOG.info("Failed reopen connection to fetch map-output from " + 
                      mapOutputLoc.getHost());