You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2008/12/17 01:06:11 UTC
svn commit: r727226 - in /hadoop/core/branches/branch-0.18: CHANGES.txt
src/core/org/apache/hadoop/ipc/Server.java
Author: rangadi
Date: Tue Dec 16 16:06:10 2008
New Revision: 727226
URL: http://svn.apache.org/viewvc?rev=727226&view=rev
Log:
HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=727226&r1=727225&r2=727226&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Dec 16 16:06:10 2008
@@ -107,6 +107,9 @@
HADOOP-4810. Data lost at cluster startup time. (hairong)
+ HADOOP-4797. Improve how RPC server reads and writes large buffers. Avoids
+ soft-leak of direct buffers and excess copies in NIO layer. (Raghu Angadi)
+
Release 0.18.2 - 2008-11-03
BUG FIXES
Modified: hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java?rev=727226&r1=727225&r2=727226&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/branches/branch-0.18/src/core/org/apache/hadoop/ipc/Server.java Tue Dec 16 16:06:10 2008
@@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
import java.net.BindException;
import java.net.InetAddress;
@@ -584,7 +586,7 @@
//
// Send as much data as we can in the non-blocking fashion
//
- int numBytes = channel.write(call.response);
+ int numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
@@ -759,7 +761,7 @@
*/
int count = -1;
if (dataLengthBuffer.remaining() > 0) {
- count = channel.read(dataLengthBuffer);
+ count = channelRead(channel, dataLengthBuffer);
if (count < 0 || dataLengthBuffer.remaining() > 0)
return count;
}
@@ -767,7 +769,7 @@
if (!versionRead) {
//Every connection is expected to send the header.
ByteBuffer versionBuffer = ByteBuffer.allocate(1);
- count = channel.read(versionBuffer);
+ count = channelRead(channel, versionBuffer);
if (count <= 0) {
return count;
}
@@ -799,7 +801,7 @@
incRpcCount(); // Increment the rpc count
}
- count = channel.read(data);
+ count = channelRead(channel, data);
if (data.remaining() == 0) {
dataLengthBuffer.clear();
@@ -1051,4 +1053,80 @@
return callQueue.size();
}
+
+ /**
+ * When the read or write buffer size is lager than this limit, i/o will be
+ * done in chunks of this size. Most RPC requests and responses will
+ * be smaller than this.
+ */
+ private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
+
+ /**
+ * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
+ * buffer increases. his also minimizes extra copies in NIO layer
+ * as a result of multiple write operations required to write a large
+ * buffer.
+ *
+ * @see WritableByteChannel#write(ByteBuffer)
+ */
+ private static int channelWrite(WritableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
+
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.write(buffer) : channelIO(null, channel, buffer);
+ }
+
+
+ /**
+ * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
+ * If the amount of data is large, it writes to channel in smaller chunks.
+ * This is to avoid jdk from creating many direct buffers as the size of
+ * ByteBuffer increases. There should not be any performance degredation.
+ *
+ * @see ReadableByteChannel#read(ByteBuffer)
+ */
+ private static int channelRead(ReadableByteChannel channel,
+ ByteBuffer buffer) throws IOException {
+
+ return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+ channel.read(buffer) : channelIO(channel, null, buffer);
+ }
+
+ /**
+ * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
+ * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
+ * one of readCh or writeCh should be non-null.
+ *
+ * @see #channelRead(ReadableByteChannel, ByteBuffer)
+ * @see #channelWrite(WritableByteChannel, ByteBuffer)
+ */
+ private static int channelIO(ReadableByteChannel readCh,
+ WritableByteChannel writeCh,
+ ByteBuffer buf) throws IOException {
+
+ int originalLimit = buf.limit();
+ int initialRemaining = buf.remaining();
+ int ret = 0;
+
+ while (buf.remaining() > 0) {
+ try {
+ int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+ buf.limit(buf.position() + ioSize);
+
+ ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
+
+ if (ret < ioSize) {
+ break;
+ }
+
+ } finally {
+ buf.limit(originalLimit);
+ }
+ }
+
+ int nBytes = initialRemaining - buf.remaining();
+ return (nBytes > 0) ? nBytes : ret;
+ }
}