You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/06/25 13:54:54 UTC
git commit: [FLINK-971] Configure PooledByteBufAllocator in
NettyConnectionManager instead of using the default allocator
Repository: incubator-flink
Updated Branches:
refs/heads/master f7f1ed2fe -> 6c827fb93
[FLINK-971] Configure PooledByteBufAllocator in NettyConnectionManager instead of using the default allocator
Configuration:
- 0 heap arenas,
- n direct arenas (where n = num incoming + num outgoing network IO threads), and
- bufferSize << 1 bytes page size.
Additionally, OutboundEnvelopeEncoder directly implements ChannelOutboundHandlerAdapter instead of the
MessageToByteEncoder<Envelope> wrapper to have tighter control of memory allocations.
This closes #38.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6c827fb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6c827fb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6c827fb9
Branch: refs/heads/master
Commit: 6c827fb932efff77c8abcdbb407385898f9d8d40
Parents: f7f1ed2
Author: uce <u....@fu-berlin.de>
Authored: Tue Jun 17 16:18:02 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 13:53:51 2014 +0200
----------------------------------------------------------------------
.../network/netty/NettyConnectionManager.java | 22 ++++++++++++--
.../network/netty/OutboundConnectionQueue.java | 2 +-
.../network/netty/OutboundEnvelopeEncoder.java | 32 +++++++++++++++-----
3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
index 73afcbc..4b54641 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -92,6 +92,22 @@ public class NettyConnectionManager implements NetworkConnectionManager {
final BufferProviderBroker bufferProviderBroker = channelManager;
final EnvelopeDispatcher envelopeDispatcher = channelManager;
+ int numHeapArenas = 0;
+ int numDirectArenas = numInThreads + numOutThreads;
+ int pageSize = bufferSize << 1;
+ int chunkSize = 16 * 1 << 20; // 16 MB
+
+ // shift pageSize maxOrder times to get to chunkSize
+ int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
+
+ PooledByteBufAllocator pooledByteBufAllocator =
+ new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
+
+ String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
+ "page size (bytes): %d, chunk size (bytes): %d.",
+ numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
+ LOG.info(msg);
+
// --------------------------------------------------------------------
// server bootstrap (incoming connections)
// --------------------------------------------------------------------
@@ -107,8 +123,8 @@ public class NettyConnectionManager implements NetworkConnectionManager {
.addLast(new InboundEnvelopeDispatcher(envelopeDispatcher));
}
})
- .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize))
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(pageSize))
+ .option(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
// --------------------------------------------------------------------
// client bootstrap (outgoing connections)
@@ -125,7 +141,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
})
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark)
- .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
.option(ChannelOption.TCP_NODELAY, false)
.option(ChannelOption.SO_KEEPALIVE, true);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
index 8fef3c1..ff6c694 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -42,7 +42,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
}
/**
- * Enqueues an envelope so be sent later.
+ * Enqueues an envelope to be sent later.
* <p/>
* This method is always invoked by the task thread that wants the envelope sent.
*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
index 424f2c0..dad690c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
@@ -16,25 +16,43 @@ package eu.stratosphere.runtime.io.network.netty;
import eu.stratosphere.runtime.io.Buffer;
import eu.stratosphere.runtime.io.network.Envelope;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
@ChannelHandler.Sharable
-public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
+public class OutboundEnvelopeEncoder extends ChannelOutboundHandlerAdapter {
public static final int HEADER_SIZE = 48;
public static final int MAGIC_NUMBER = 0xBADC0FFE;
@Override
- protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) throws Exception {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ Envelope env = (Envelope) msg;
+
+ ByteBuf buf = ctx.alloc().directBuffer();
+
+ encode(env, buf);
+
+ if (buf.isReadable()) {
+ ctx.write(buf, promise);
+ }
+ else {
+ buf.release();
+ ctx.write(Unpooled.EMPTY_BUFFER, promise);
+ }
+ }
+
+ private void encode(Envelope env, ByteBuf out) {
// --------------------------------------------------------------------
// (1) header (48 bytes)
// --------------------------------------------------------------------
out.writeInt(MAGIC_NUMBER); // 4 bytes
- if (out.getInt(out.writerIndex()-4) != MAGIC_NUMBER) {
+ if (out.getInt(out.writerIndex() - 4) != MAGIC_NUMBER) {
throw new RuntimeException();
}
@@ -54,12 +72,12 @@ public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
// (3) buffer (var length)
// --------------------------------------------------------------------
if (env.getBuffer() != null) {
- Buffer buffer = env.getBuffer();
- out.writeBytes(buffer.getMemorySegment().wrap(0, buffer.size()));
+ Buffer envBuffer = env.getBuffer();
+ out.writeBytes(envBuffer.getMemorySegment().wrap(0, envBuffer.size()));
// Recycle the buffer from OUR buffer pool after everything has been
// copied to Nettys buffer space.
- buffer.recycleBuffer();
+ envBuffer.recycleBuffer();
}
}
}