You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/03 16:04:48 UTC

svn commit: r662805 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/

Author: ddas
Date: Tue Jun  3 07:04:47 2008
New Revision: 662805

URL: http://svn.apache.org/viewvc?rev=662805&view=rev
Log:
HADOOP-3429. Increases the size of the buffers used for the communication for Streaming jobs. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun  3 07:04:47 2008
@@ -189,6 +189,9 @@
     HADOOP-3434. Retain the cause of the bind failure in Server::bind.
     (Steve Loughran via cdouglas)
 
+    HADOOP-3429. Increases the size of the buffers used for the communication
+    for Streaming jobs. (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jun  3 07:04:47 2008
@@ -22,7 +22,6 @@
 import java.nio.charset.CharacterCodingException;
 import java.io.IOException;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
 import java.util.Arrays;
@@ -35,9 +34,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 
 import org.apache.hadoop.io.Text;
@@ -65,6 +62,8 @@
   final static int OUTSIDE = 1;
   final static int SINGLEQ = 2;
   final static int DOUBLEQ = 3;
+  
+  private final static int BUFFER_SIZE = 128 * 1024;
 
   static String[] splitArgs(String args) {
     ArrayList argList = new ArrayList();
@@ -172,8 +171,12 @@
       builder.environment().putAll(childEnv.toMap());
       sim = builder.start();
 
-      clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
-      clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+      clientOut_ = new DataOutputStream(new BufferedOutputStream(
+                                              sim.getOutputStream(),
+                                              BUFFER_SIZE));
+      clientIn_ = new DataInputStream(new BufferedInputStream(
+                                              sim.getInputStream(),
+                                              BUFFER_SIZE));
       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
       startTime_ = System.currentTimeMillis();
 
@@ -457,6 +460,7 @@
       if (!doPipe_) return;
       try {
         if (clientOut_ != null) {
+          clientOut_.flush();
           clientOut_.close();
         }
       } catch (IOException io) {

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Jun  3 07:04:47 2008
@@ -89,7 +89,6 @@
         }
         write(value);
         clientOut_.write('\n');
-        clientOut_.flush();
       } else {
         numRecSkipped_++;
       }

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Tue Jun  3 07:04:47 2008
@@ -78,7 +78,6 @@
           clientOut_.write('\t');
           write(val);
           clientOut_.write('\n');
-          clientOut_.flush();
         } else {
           // "identity reduce"
           output.collect(key, val);