You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/12/17 01:11:05 UTC

svn commit: r727231 - in /hadoop/core/branches/branch-0.20: ./ CHANGES.txt src/core/org/apache/hadoop/ipc/Server.java

Author: rangadi
Date: Tue Dec 16 16:11:05 2008
New Revision: 727231

URL: http://svn.apache.org/viewvc?rev=727231&view=rev
Log:
HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)

Modified:
    hadoop/core/branches/branch-0.20/   (props changed)
    hadoop/core/branches/branch-0.20/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/ipc/Server.java

Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 16:11:05 2008
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727191
+/hadoop/core/trunk:727001,727191,727228

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727231&r1=727230&r2=727231&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 16:11:05 2008
@@ -1542,6 +1542,9 @@
     HADOOP-4857. Fixes TestUlimit to have exactly 1 map in the jobs spawned.
     (Ravi Gummadi via ddas)
 
+    HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
+    soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Tue Dec 16 16:11:05 2008
@@ -0,0 +1,3 @@
+/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
+/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
+/hadoop/core/trunk/CHANGES.txt:727001,727191,727228

Modified: hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/ipc/Server.java?rev=727231&r1=727230&r2=727231&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/ipc/Server.java Tue Dec 16 16:11:05 2008
@@ -27,10 +27,12 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 
 import java.net.BindException;
 import java.net.InetAddress;
@@ -608,7 +610,7 @@
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channel.write(call.response);
+          int numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
             return true;
           }
@@ -794,7 +796,7 @@
          */    
         int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
-          count = channel.read(dataLengthBuffer);       
+          count = channelRead(channel, dataLengthBuffer);       
           if (count < 0 || dataLengthBuffer.remaining() > 0) 
             return count;
         }
@@ -802,7 +804,7 @@
         if (!versionRead) {
           //Every connection is expected to send the header.
           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
-          count = channel.read(versionBuffer);
+          count = channelRead(channel, versionBuffer);
           if (count <= 0) {
             return count;
           }
@@ -834,7 +836,7 @@
           incRpcCount();  // Increment the rpc count
         }
         
-        count = channel.read(data);
+        count = channelRead(channel, data);
         
         if (data.remaining() == 0) {
           dataLengthBuffer.clear();
@@ -1169,4 +1171,80 @@
     return callQueue.size();
   }
   
+  
+  /**
+   * When the read or write buffer size is larger than this limit, i/o will be 
+   * done in chunks of this size. Most RPC requests and responses would be
+   * be smaller.
+   */
+  private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
+  
+  /**
+   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * buffer increases. This also minimizes extra copies in NIO layer
+   * as a result of multiple write operations required to write a large 
+   * buffer.  
+   *
+   * @see WritableByteChannel#write(ByteBuffer)
+   */
+  private static int channelWrite(WritableByteChannel channel, 
+                                  ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.write(buffer) : channelIO(null, channel, buffer);
+  }
+  
+  
+  /**
+   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * ByteBuffer increases. There should not be any performance degredation.
+   * 
+   * @see ReadableByteChannel#read(ByteBuffer)
+   */
+  private static int channelRead(ReadableByteChannel channel, 
+                                 ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.read(buffer) : channelIO(channel, null, buffer);
+  }
+  
+  /**
+   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
+   * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+   * one of readCh or writeCh should be non-null.
+   * 
+   * @see #channelRead(ReadableByteChannel, ByteBuffer)
+   * @see #channelWrite(WritableByteChannel, ByteBuffer)
+   */
+  private static int channelIO(ReadableByteChannel readCh, 
+                               WritableByteChannel writeCh,
+                               ByteBuffer buf) throws IOException {
+    
+    int originalLimit = buf.limit();
+    int initialRemaining = buf.remaining();
+    int ret = 0;
+    
+    while (buf.remaining() > 0) {
+      try {
+        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+        buf.limit(buf.position() + ioSize);
+        
+        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
+        
+        if (ret < ioSize) {
+          break;
+        }
+
+      } finally {
+        buf.limit(originalLimit);        
+      }
+    }
+
+    int nBytes = initialRemaining - buf.remaining(); 
+    return (nBytes > 0) ? nBytes : ret;
+  }      
 }