You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/16 20:59:36 UTC
svn commit: r585220 - in /lucene/hadoop/trunk: CHANGES.txt
src/c++/pipes/impl/HadoopPipes.cc
src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Author: omalley
Date: Tue Oct 16 11:59:36 2007
New Revision: 585220
URL: http://svn.apache.org/viewvc?rev=585220&view=rev
Log:
HADOOP-1788. Increase the buffer size on Pipes' command socket from 1k to
128k. Contributed by Amareshwari Sri Ramadasu and Christian Kunz.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 16 11:59:36 2007
@@ -118,6 +118,9 @@
HADOOP-1774. Remove use of INode.parent in Block CRC upgrade.
(Raghu Angadi via dhruba)
+ HADOOP-1788. Increase the buffer size on the Pipes command socket.
+ (Amareshwari Sri Ramadasu and Christian Kunz via omalley)
+
BUG FIXES
HADOOP-1946. The Datanode code does not need to invoke du on
Modified: lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc Tue Oct 16 11:59:36 2007
@@ -855,6 +855,8 @@
int sock = -1;
FILE* stream = NULL;
FILE* outStream = NULL;
+ char *bufin = NULL;
+ char *bufout = NULL;
if (portStr) {
sock = socket(PF_INET, SOCK_STREAM, 0);
HADOOP_ASSERT(sock != - 1,
@@ -866,8 +868,22 @@
HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
string("problem connecting command socket: ") +
strerror(errno));
+
stream = fdopen(sock, "r");
outStream = fdopen(sock, "w");
+
+ // increase buffer size
+ int bufsize = 128*1024;
+ int setbuf;
+ bufin = new char[bufsize];
+ bufout = new char[bufsize];
+ setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+ + strerror(errno));
+ setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+ HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+ + strerror(errno));
+
connection = new BinaryProtocol(stream, context, outStream);
} else if (getenv("hadoop.pipes.command.file")) {
char* filename = getenv("hadoop.pipes.command.file");
@@ -907,6 +923,8 @@
if (outStream != NULL) {
//fclose(outStream);
}
+ delete bufin;
+ delete bufout;
return true;
} catch (Error& err) {
fprintf(stderr, "Hadoop Pipes Exception: %s\n",
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Tue Oct 16 11:59:36 2007
@@ -44,6 +44,11 @@
implements DownwardProtocol<K1, V1> {
public static final int CURRENT_PROTOCOL_VERSION = 0;
+ /**
+ * The buffer size for the command socket
+ */
+ private static final int BUFFER_SIZE = 128*1024;
+
private DataOutputStream stream;
private DataOutputBuffer buffer = new DataOutputBuffer();
private static final Log LOG =
@@ -87,7 +92,8 @@
public UplinkReaderThread(InputStream stream,
UpwardProtocol<K2, V2> handler,
K2 key, V2 value) throws IOException{
- inStream = new DataInputStream(stream);
+ inStream = new DataInputStream(new BufferedInputStream(stream,
+ BUFFER_SIZE));
this.handler = handler;
this.key = key;
this.value = value;
@@ -207,7 +213,8 @@
if (Submitter.getKeepCommandFile(config)) {
raw = new TeeOutputStream("downlink.data", raw);
}
- stream = new DataOutputStream(raw);
+ stream = new DataOutputStream(new BufferedOutputStream(raw,
+ BUFFER_SIZE)) ;
uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
handler, key, value);
uplink.setName("pipe-uplink-handler");
@@ -287,6 +294,7 @@
public void close() throws IOException {
WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
LOG.debug("Sent close command");
+ stream.flush();
}
public void abort() throws IOException {