You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2007/10/16 20:59:36 UTC

svn commit: r585220 - in /lucene/hadoop/trunk: CHANGES.txt src/c++/pipes/impl/HadoopPipes.cc src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

Author: omalley
Date: Tue Oct 16 11:59:36 2007
New Revision: 585220

URL: http://svn.apache.org/viewvc?rev=585220&view=rev
Log:
HADOOP-1788. Increase the buffer size on Pipes' command socket from 1k to
128k. Contributed by Amareshwari Sri Ramadasu and Christian Kunz.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 16 11:59:36 2007
@@ -118,6 +118,9 @@
     HADOOP-1774. Remove use of INode.parent in Block CRC upgrade.
     (Raghu Angadi via dhruba)
 
+    HADOOP-1788.  Increase the buffer size on the Pipes command socket.
+    (Amareshwari Sri Ramadasu and Christian Kunz via omalley)
+
   BUG FIXES
 
     HADOOP-1946.  The Datanode code does not need to invoke du on

Modified: lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ lucene/hadoop/trunk/src/c++/pipes/impl/HadoopPipes.cc Tue Oct 16 11:59:36 2007
@@ -855,6 +855,8 @@
       int sock = -1;
       FILE* stream = NULL;
       FILE* outStream = NULL;
+      char *bufin = NULL;
+      char *bufout = NULL;
       if (portStr) {
         sock = socket(PF_INET, SOCK_STREAM, 0);
         HADOOP_ASSERT(sock != - 1,
@@ -866,8 +868,22 @@
         HADOOP_ASSERT(connect(sock, (sockaddr*) &addr, sizeof(addr)) == 0,
                       string("problem connecting command socket: ") +
                       strerror(errno));
+
         stream = fdopen(sock, "r");
         outStream = fdopen(sock, "w");
+
+        // increase buffer size
+        int bufsize = 128*1024;
+        int setbuf;
+        bufin = new char[bufsize];
+        bufout = new char[bufsize];
+        setbuf = setvbuf(stream, bufin, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for inStream: ")
+                                     + strerror(errno));
+        setbuf = setvbuf(outStream, bufout, _IOFBF, bufsize);
+        HADOOP_ASSERT(setbuf == 0, string("problem with setvbuf for outStream: ")
+                                     + strerror(errno));
+
         connection = new BinaryProtocol(stream, context, outStream);
       } else if (getenv("hadoop.pipes.command.file")) {
         char* filename = getenv("hadoop.pipes.command.file");
@@ -907,6 +923,8 @@
       if (outStream != NULL) {
         //fclose(outStream);
       } 
+      delete bufin;
+      delete bufout;
       return true;
     } catch (Error& err) {
       fprintf(stderr, "Hadoop Pipes Exception: %s\n", 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java?rev=585220&r1=585219&r2=585220&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/pipes/BinaryProtocol.java Tue Oct 16 11:59:36 2007
@@ -44,6 +44,11 @@
   implements DownwardProtocol<K1, V1> {
   
   public static final int CURRENT_PROTOCOL_VERSION = 0;
+  /**
+   * The buffer size for the command socket
+   */
+  private static final int BUFFER_SIZE = 128*1024;
+
   private DataOutputStream stream;
   private DataOutputBuffer buffer = new DataOutputBuffer();
   private static final Log LOG = 
@@ -87,7 +92,8 @@
     public UplinkReaderThread(InputStream stream,
                               UpwardProtocol<K2, V2> handler, 
                               K2 key, V2 value) throws IOException{
-      inStream = new DataInputStream(stream);
+      inStream = new DataInputStream(new BufferedInputStream(stream, 
+                                                             BUFFER_SIZE));
       this.handler = handler;
       this.key = key;
       this.value = value;
@@ -207,7 +213,8 @@
     if (Submitter.getKeepCommandFile(config)) {
       raw = new TeeOutputStream("downlink.data", raw);
     }
-    stream = new DataOutputStream(raw);
+    stream = new DataOutputStream(new BufferedOutputStream(raw, 
+                                                           BUFFER_SIZE)) ;
     uplink = new UplinkReaderThread<K2, V2>(sock.getInputStream(),
                                             handler, key, value);
     uplink.setName("pipe-uplink-handler");
@@ -287,6 +294,7 @@
   public void close() throws IOException {
     WritableUtils.writeVInt(stream, MessageType.CLOSE.code);
     LOG.debug("Sent close command");
+    stream.flush();
   }
   
   public void abort() throws IOException {