You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/06/25 13:54:54 UTC

git commit: [FLINK-971] Configure PooledByteBufAllocator in NettyConnectionManager instead of using the default allocator

Repository: incubator-flink
Updated Branches:
  refs/heads/master f7f1ed2fe -> 6c827fb93


[FLINK-971] Configure PooledByteBufAllocator in NettyConnectionManager instead of using the default allocator

Configuration:
    - 0 heap arenas,
    - n direct arenas (where n = num incoming + num outgoing network IO threads), and
    - bufferSize << 1 bytes page size.

Additionally, OutboundEnvelopeEncoder directly implements ChannelOutboundHandlerAdapter instead of the
MessageToByteEncoder<Envelope> wrapper to have tighter control of memory allocations.

This closes #38.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6c827fb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6c827fb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6c827fb9

Branch: refs/heads/master
Commit: 6c827fb932efff77c8abcdbb407385898f9d8d40
Parents: f7f1ed2
Author: uce <u....@fu-berlin.de>
Authored: Tue Jun 17 16:18:02 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 13:53:51 2014 +0200

----------------------------------------------------------------------
 .../network/netty/NettyConnectionManager.java   | 22 ++++++++++++--
 .../network/netty/OutboundConnectionQueue.java  |  2 +-
 .../network/netty/OutboundEnvelopeEncoder.java  | 32 +++++++++++++++-----
 3 files changed, 45 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
index 73afcbc..4b54641 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -92,6 +92,22 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		final BufferProviderBroker bufferProviderBroker = channelManager;
 		final EnvelopeDispatcher envelopeDispatcher = channelManager;
 
+		int numHeapArenas = 0;
+		int numDirectArenas = numInThreads + numOutThreads;
+		int pageSize = bufferSize << 1;
+		int chunkSize = 16 * 1 << 20; // 16 MB
+
+		// shift pageSize maxOrder times to get to chunkSize
+		int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
+
+		PooledByteBufAllocator pooledByteBufAllocator =
+				new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
+
+		String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
+				"page size (bytes): %d, chunk size (bytes): %d.",
+				numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
+		LOG.info(msg);
+
 		// --------------------------------------------------------------------
 		// server bootstrap (incoming connections)
 		// --------------------------------------------------------------------
@@ -107,8 +123,8 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 								.addLast(new InboundEnvelopeDispatcher(envelopeDispatcher));
 					}
 				})
-				.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize))
-				.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+				.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(pageSize))
+				.option(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
 
 		// --------------------------------------------------------------------
 		// client bootstrap (outgoing connections)
@@ -125,7 +141,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 				})
 				.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark)
 				.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark)
-				.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+				.option(ChannelOption.ALLOCATOR, pooledByteBufAllocator)
 				.option(ChannelOption.TCP_NODELAY, false)
 				.option(ChannelOption.SO_KEEPALIVE, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
index 8fef3c1..ff6c694 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -42,7 +42,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 	}
 
 	/**
-	 * Enqueues an envelope so be sent later.
+	 * Enqueues an envelope to be sent later.
 	 * <p/>
 	 * This method is always invoked by the task thread that wants the envelope sent.
 	 *

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c827fb9/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
index 424f2c0..dad690c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java
@@ -16,25 +16,43 @@ package eu.stratosphere.runtime.io.network.netty;
 import eu.stratosphere.runtime.io.Buffer;
 import eu.stratosphere.runtime.io.network.Envelope;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
 
 @ChannelHandler.Sharable
-public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
+public class OutboundEnvelopeEncoder extends ChannelOutboundHandlerAdapter {
 
 	public static final int HEADER_SIZE = 48;
 
 	public static final int MAGIC_NUMBER = 0xBADC0FFE;
 
 	@Override
-	protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) throws Exception {
+	public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+		Envelope env = (Envelope) msg;
+
+		ByteBuf buf = ctx.alloc().directBuffer();
+
+		encode(env, buf);
+
+		if (buf.isReadable()) {
+			ctx.write(buf, promise);
+		}
+		else {
+			buf.release();
+			ctx.write(Unpooled.EMPTY_BUFFER, promise);
+		}
+	}
+
+	private void encode(Envelope env, ByteBuf out) {
 		// --------------------------------------------------------------------
 		// (1) header (48 bytes)
 		// --------------------------------------------------------------------
 		out.writeInt(MAGIC_NUMBER); // 4 bytes
 
-		if (out.getInt(out.writerIndex()-4) != MAGIC_NUMBER) {
+		if (out.getInt(out.writerIndex() - 4) != MAGIC_NUMBER) {
 			throw new RuntimeException();
 		}
 
@@ -54,12 +72,12 @@ public class OutboundEnvelopeEncoder extends MessageToByteEncoder<Envelope> {
 		// (3) buffer (var length)
 		// --------------------------------------------------------------------
 		if (env.getBuffer() != null) {
-			Buffer buffer = env.getBuffer();
-			out.writeBytes(buffer.getMemorySegment().wrap(0, buffer.size()));
+			Buffer envBuffer = env.getBuffer();
+			out.writeBytes(envBuffer.getMemorySegment().wrap(0, envBuffer.size()));
 
 			// Recycle the buffer from OUR buffer pool after everything has been
 			// copied to Nettys buffer space.
-			buffer.recycleBuffer();
+			envBuffer.recycleBuffer();
 		}
 	}
 }