You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/03 16:04:48 UTC
svn commit: r662805 - in /hadoop/core/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
Author: ddas
Date: Tue Jun 3 07:04:47 2008
New Revision: 662805
URL: http://svn.apache.org/viewvc?rev=662805&view=rev
Log:
HADOOP-3429. Increases the size of the buffers used for the communication for Streaming jobs. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun 3 07:04:47 2008
@@ -189,6 +189,9 @@
HADOOP-3434. Retain the cause of the bind failure in Server::bind.
(Steve Loughran via cdouglas)
+ HADOOP-3429. Increases the size of the buffers used for the communication
+ for Streaming jobs. (Amareshwari Sriramadasu via ddas)
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Tue Jun 3 07:04:47 2008
@@ -22,7 +22,6 @@
import java.nio.charset.CharacterCodingException;
import java.io.IOException;
import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Iterator;
import java.util.Arrays;
@@ -35,9 +34,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.io.Text;
@@ -65,6 +62,8 @@
final static int OUTSIDE = 1;
final static int SINGLEQ = 2;
final static int DOUBLEQ = 3;
+
+ private final static int BUFFER_SIZE = 128 * 1024;
static String[] splitArgs(String args) {
ArrayList argList = new ArrayList();
@@ -172,8 +171,12 @@
builder.environment().putAll(childEnv.toMap());
sim = builder.start();
- clientOut_ = new DataOutputStream(new BufferedOutputStream(sim.getOutputStream()));
- clientIn_ = new DataInputStream(new BufferedInputStream(sim.getInputStream()));
+ clientOut_ = new DataOutputStream(new BufferedOutputStream(
+ sim.getOutputStream(),
+ BUFFER_SIZE));
+ clientIn_ = new DataInputStream(new BufferedInputStream(
+ sim.getInputStream(),
+ BUFFER_SIZE));
clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
startTime_ = System.currentTimeMillis();
@@ -457,6 +460,7 @@
if (!doPipe_) return;
try {
if (clientOut_ != null) {
+ clientOut_.flush();
clientOut_.close();
}
} catch (IOException io) {
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Tue Jun 3 07:04:47 2008
@@ -89,7 +89,6 @@
}
write(value);
clientOut_.write('\n');
- clientOut_.flush();
} else {
numRecSkipped_++;
}
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=662805&r1=662804&r2=662805&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Tue Jun 3 07:04:47 2008
@@ -78,7 +78,6 @@
clientOut_.write('\t');
write(val);
clientOut_.write('\n');
- clientOut_.flush();
} else {
// "identity reduce"
output.collect(key, val);