You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/12/17 01:10:32 UTC
svn commit: r727230 - in /hadoop/core/branches/branch-0.19: ./ CHANGES.txt
src/core/org/apache/hadoop/ipc/Server.java
Author: rangadi
Date: Tue Dec 16 16:10:32 2008
New Revision: 727230
URL: http://svn.apache.org/viewvc?rev=727230&view=rev
Log:
HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
Modified:
hadoop/core/branches/branch-0.19/ (props changed)
hadoop/core/branches/branch-0.19/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
Propchange: hadoop/core/branches/branch-0.19/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 16:10:32 2008
@@ -1 +1 @@
-/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217
+/hadoop/core/trunk:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,709040,709303,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=727230&r1=727229&r2=727230&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Tue Dec 16 16:10:32 2008
@@ -1098,6 +1098,9 @@
HADOOP-4810. Data lost at cluster startup time. (hairong)
+ HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
+ soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 16 16:10:32 2008
@@ -1 +1,2 @@
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217
+/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228
Modified: hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java?rev=727230&r1=727229&r2=727230&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/ipc/Server.java Tue Dec 16 16:10:32 2008
@@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
import java.net.BindException;
import java.net.InetAddress;
@@ -588,7 +590,7 @@
//
// Send as much data as we can in the non-blocking fashion
//
- int numBytes = channel.write(call.response);
+ int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
@@ -763,7 +765,7 @@
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
- count = channel.read(dataLengthBuffer);
+ count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
@@ -771,7 +773,7 @@
if (!versionRead) {
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
- count = channel.read(versionBuffer);
+ count = channelRead(channel, versionBuffer);
if (count <= 0) {
return count;
}
@@ -803,7 +805,7 @@
incRpcCount(); // Increment the rpc count
}
- count = channel.read(data);
+ count = channelRead(channel, data);
if (data.remaining() == 0) {
dataLengthBuffer.clear();
@@ -1051,4 +1053,80 @@
return callQueue.size();
}
+
+ /**
+ * When the read or write buffer size is larger than this limit, i/o will be
+ * done in chunks of this size. Most RPC requests and responses would be
+ * be smaller.
+ */
+ private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
+
+ /**
+ * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
+ * buffer increases. This also minimizes extra copies in NIO layer
+ * as a result of multiple write operations required to write a large
+ * buffer.
+ *
+ * @see WritableByteChannel#write(ByteBuffer)
+ */
+ private static int channelWrite(WritableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
+
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.write(buffer) : channelIO(null, channel, buffer);
+ }
+
+
+ /**
+ * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
+ * ByteBuffer increases. There should not be any performance degredation.
+ *
+ * @see ReadableByteChannel#read(ByteBuffer)
+ */
+ private static int channelRead(ReadableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
+
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.read(buffer) : channelIO(channel, null, buffer);
+ }
+
+ /**
+ * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
+ * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+ * one of readCh or writeCh should be non-null.
+ *
+ * @see #channelRead(ReadableByteChannel, ByteBuffer)
+ * @see #channelWrite(WritableByteChannel, ByteBuffer)
+ */
+ private static int channelIO(ReadableByteChannel readCh,
+ WritableByteChannel writeCh,
+ ByteBuffer buf) throws IOException {
+
+ int originalLimit = buf.limit();
+ int initialRemaining = buf.remaining();
+ int ret = 0;
+
+ while (buf.remaining() > 0) {
+ try {
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+ buf.limit(buf.position() + ioSize);
+
+ ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
+
+ if (ret < ioSize) {
+ break;
+ }
+
+ } finally {
+ buf.limit(originalLimit);
+ }
+ }
+
+ int nBytes = initialRemaining - buf.remaining();
+ return (nBytes > 0) ? nBytes : ret;
+ }
}