You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/12/17 01:06:11 UTC

svn commit: r727226 - in /hadoop/core/branches/branch-0.18: CHANGES.txt src/core/org/apache/hadoop/ipc/Server.java

Author: rangadi
Date: Tue Dec 16 16:06:10 2008
New Revision: 727226

URL: http://svn.apache.org/viewvc?rev=727226&view=rev
Log:
HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)

Modified:
    hadoop/core/branches/branch-0.18/CHANGES.txt
    hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java

Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=727226&r1=727225&r2=727226&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Dec 16 16:06:10 2008
@@ -107,6 +107,9 @@
 
     HADOOP-4810. Data lost at cluster startup time. (hairong)
 
+    HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
+    soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java?rev=727226&r1=727225&r2=727226&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java Tue Dec 16 16:06:10 2008
@@ -27,10 +27,12 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 
 import java.net.BindException;
 import java.net.InetAddress;
@@ -584,7 +586,7 @@
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channel.write(call.response);
+          int numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
             return true;
           }
@@ -759,7 +761,7 @@
          */    
         int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
-          count = channel.read(dataLengthBuffer);       
+          count = channelRead(channel, dataLengthBuffer);       
           if (count < 0 || dataLengthBuffer.remaining() > 0) 
             return count;
         }
@@ -767,7 +769,7 @@
         if (!versionRead) {
           //Every connection is expected to send the header.
           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
-          count = channel.read(versionBuffer);
+          count = channelRead(channel, versionBuffer);
           if (count <= 0) {
             return count;
           }
@@ -799,7 +801,7 @@
           incRpcCount();  // Increment the rpc count
         }
         
-        count = channel.read(data);
+        count = channelRead(channel, data);
         
         if (data.remaining() == 0) {
           dataLengthBuffer.clear();
@@ -1051,4 +1053,80 @@
     return callQueue.size();
   }
   
+  
+  /**
+   * When the read or write buffer size is lager than this limit, i/o will be 
+   * done in chunks of this size. Most RPC requests and responses will
+   * be smaller than this.
+   */
+  private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
+  
+  /**
+   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * buffer increases. his also minimizes extra copies in NIO layer
+   * as a result of multiple write operations required to write a large 
+   * buffer.  
+   *
+   * @see WritableByteChannel#write(ByteBuffer)
+   */
+  private static int channelWrite(WritableByteChannel channel, 
+                                  ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.write(buffer) : channelIO(null, channel, buffer);
+  }
+  
+  
+  /**
+   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * ByteBuffer increases. There should not be any performance degredation.
+   * 
+   * @see ReadableByteChannel#read(ByteBuffer)
+   */
+  private static int channelRead(ReadableByteChannel channel, 
+                                 ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.read(buffer) : channelIO(channel, null, buffer);
+  }
+  
+  /**
+   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
+   * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+   * one of readCh or writeCh should be non-null.
+   * 
+   * @see #channelRead(ReadableByteChannel, ByteBuffer)
+   * @see #channelWrite(WritableByteChannel, ByteBuffer)
+   */
+  private static int channelIO(ReadableByteChannel readCh, 
+                               WritableByteChannel writeCh,
+                               ByteBuffer buf) throws IOException {
+    
+    int originalLimit = buf.limit();
+    int initialRemaining = buf.remaining();
+    int ret = 0;
+    
+    while (buf.remaining() > 0) {
+      try {
+        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+        buf.limit(buf.position() + ioSize);
+        
+        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
+        
+        if (ret < ioSize) {
+          break;
+        }
+
+      } finally {
+        buf.limit(originalLimit);        
+      }
+    }
+
+    int nBytes = initialRemaining - buf.remaining(); 
+    return (nBytes > 0) ? nBytes : ret;
+  }      
 }