You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/12/17 01:10:32 UTC

svn commit: r727230 - in /hadoop/core/branches/branch-0.19: ./ CHANGES.txt src/core/org/apache/hadoop/ipc/Server.java

Author: rangadi
Date: Tue Dec 16 16:10:32 2008
New Revision: 727230

URL: http://svn.apache.org/viewvc?rev=727230&view=rev
Log:
HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)

Modified:
    hadoop/core/branches/branch-0.19/   (props changed)
    hadoop/core/branches/branch-0.19/CHANGES.txt   (contents, props changed)
    hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java

Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 16:10:32 2008
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=727230&r1=727229&r2=727230&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Dec 16 16:10:32 2008
@@ -1098,6 +1098,9 @@
 
     HADOOP-4810. Data lost at cluster startup time. (hairong)
 
+    HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
+    soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
+
 Release 0.18.2 - 2008-11-03
 
   BUG FIXES

Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 16:10:32 2008
@@ -1 +1,2 @@
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217
+/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228

Modified: hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java?rev=727230&r1=727229&r2=727230&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java Tue Dec 16 16:10:32 2008
@@ -27,10 +27,12 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
 
 import java.net.BindException;
 import java.net.InetAddress;
@@ -588,7 +590,7 @@
           //
           // Send as much data as we can in the non-blocking fashion
           //
-          int numBytes = channel.write(call.response);
+          int numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
             return true;
           }
@@ -763,7 +765,7 @@
          */    
         int count = -1;
         if (dataLengthBuffer.remaining() > 0) {
-          count = channel.read(dataLengthBuffer);       
+          count = channelRead(channel, dataLengthBuffer);       
           if (count < 0 || dataLengthBuffer.remaining() > 0) 
             return count;
         }
@@ -771,7 +773,7 @@
         if (!versionRead) {
           //Every connection is expected to send the header.
           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
-          count = channel.read(versionBuffer);
+          count = channelRead(channel, versionBuffer);
           if (count <= 0) {
             return count;
           }
@@ -803,7 +805,7 @@
           incRpcCount();  // Increment the rpc count
         }
         
-        count = channel.read(data);
+        count = channelRead(channel, data);
         
         if (data.remaining() == 0) {
           dataLengthBuffer.clear();
@@ -1051,4 +1053,80 @@
     return callQueue.size();
   }
   
+  
+  /**
+   * When the read or write buffer size is larger than this limit, i/o will be 
+   * done in chunks of this size. Most RPC requests and responses would be
+   * be smaller.
+   */
+  private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
+  
+  /**
+   * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * buffer increases. This also minimizes extra copies in NIO layer
+   * as a result of multiple write operations required to write a large 
+   * buffer.  
+   *
+   * @see WritableByteChannel#write(ByteBuffer)
+   */
+  private static int channelWrite(WritableByteChannel channel, 
+                                  ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.write(buffer) : channelIO(null, channel, buffer);
+  }
+  
+  
+  /**
+   * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+   * If the amount of data is large, it writes to channel in smaller chunks. 
+   * This is to avoid jdk from creating many direct buffers as the size of 
+   * ByteBuffer increases. There should not be any performance degredation.
+   * 
+   * @see ReadableByteChannel#read(ByteBuffer)
+   */
+  private static int channelRead(ReadableByteChannel channel, 
+                                 ByteBuffer buffer) throws IOException {
+    
+    return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+           channel.read(buffer) : channelIO(channel, null, buffer);
+  }
+  
+  /**
+   * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
+   * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+   * one of readCh or writeCh should be non-null.
+   * 
+   * @see #channelRead(ReadableByteChannel, ByteBuffer)
+   * @see #channelWrite(WritableByteChannel, ByteBuffer)
+   */
+  private static int channelIO(ReadableByteChannel readCh, 
+                               WritableByteChannel writeCh,
+                               ByteBuffer buf) throws IOException {
+    
+    int originalLimit = buf.limit();
+    int initialRemaining = buf.remaining();
+    int ret = 0;
+    
+    while (buf.remaining() > 0) {
+      try {
+        int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+        buf.limit(buf.position() + ioSize);
+        
+        ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
+        
+        if (ret < ioSize) {
+          break;
+        }
+
+      } finally {
+        buf.limit(originalLimit);        
+      }
+    }
+
+    int nBytes = initialRemaining - buf.remaining(); 
+    return (nBytes > 0) ? nBytes : ret;
+  }      
 }