You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/12 09:45:34 UTC

[1/2] flink git commit: [FLINK-8759][network] preparations for the update of netty to version 4.1

Repository: flink
Updated Branches:
  refs/heads/master 2b33615b6 -> 7cad1d33a


[FLINK-8759][network] preparations for the update of netty to version 4.1

This closes #5571.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cad1d33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cad1d33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cad1d33

Branch: refs/heads/master
Commit: 7cad1d33a6e82dd1b7255dcb3671c719da43097d
Parents: fd5b680
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 14:06:00 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 12 11:45:21 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 31 +++++++++++++++++++-
 .../runtime/io/network/netty/NettyProtocol.java |  4 +--
 .../netty/NettyMessageSerializationTest.java    |  4 ++-
 3 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2da4099..ca0de6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -202,12 +202,18 @@ public abstract class NettyMessage {
 	 * </pre>
 	 */
 	static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
+		private final boolean restoreOldNettyBehaviour;
 
 		/**
 		 * Creates a new message decoded with the required frame properties.
+		 *
+		 * @param restoreOldNettyBehaviour
+		 * 		restore Netty 4.0.27 code in {@link LengthFieldBasedFrameDecoder#extractFrame} to
+		 * 		copy instead of slicing the buffer
 		 */
-		NettyMessageDecoder() {
+		NettyMessageDecoder(boolean restoreOldNettyBehaviour) {
 			super(Integer.MAX_VALUE, 0, 4, -4, 4);
+			this.restoreOldNettyBehaviour = restoreOldNettyBehaviour;
 		}
 
 		@Override
@@ -262,6 +268,29 @@ public abstract class NettyMessage {
 				msg.release();
 			}
 		}
+
+		@Override
+		protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
+			if (restoreOldNettyBehaviour) {
+				/*
+				 * For non-credit based code paths with Netty >= 4.0.28.Final:
+				 * These versions contain an improvement by Netty, which slices a Netty buffer
+				 * instead of doing a memory copy [1] in the
+				 * LengthFieldBasedFrameDecoder. In some situations, this
+				 * interacts badly with our Netty pipeline leading to OutOfMemory
+				 * errors.
+				 *
+				 * [1] https://github.com/netty/netty/issues/3704
+				 *
+				 * TODO: remove along with the non-credit based fallback protocol
+				 */
+				ByteBuf frame = ctx.alloc().buffer(length);
+				frame.writeBytes(buffer, index, length);
+				return frame;
+			} else {
+				return super.extractFrame(ctx, buffer, index, length);
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index 98ffa93..ebad11b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -83,7 +83,7 @@ public class NettyProtocol {
 
 		return new ChannelHandler[] {
 			messageEncoder,
-			new NettyMessage.NettyMessageDecoder(),
+			new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
 			serverHandler,
 			queueOfPartitionQueues
 		};
@@ -127,7 +127,7 @@ public class NettyProtocol {
 				new PartitionRequestClientHandler();
 		return new ChannelHandler[] {
 			messageEncoder,
-			new NettyMessage.NettyMessageDecoder(),
+			new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
 			networkClientHandler};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index cc732cd..ebdb529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -44,9 +44,11 @@ import static org.junit.Assert.assertTrue;
  */
 public class NettyMessageSerializationTest {
 
+	public static final boolean RESTORE_OLD_NETTY_BEHAVIOUR = false;
+
 	private final EmbeddedChannel channel = new EmbeddedChannel(
 			new NettyMessage.NettyMessageEncoder(), // outbound messages
-			new NettyMessage.NettyMessageDecoder()); // inbound messages
+			new NettyMessage.NettyMessageDecoder(RESTORE_OLD_NETTY_BEHAVIOUR)); // inbound messages
 
 	private final Random random = new Random();
 


[2/2] flink git commit: [FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder

Posted by tr...@apache.org.
[FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder

This replaces one additional step from the pipeline and does not only remove
overhead there but also allows use to override the #extractFrame() method to
restore the old Netty 4.0.27 behaviour for non-credit based code paths which
had a bug with Netty >= 4.0.28 there (see FLINK-8759).

This closes #5570.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd5b680d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd5b680d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd5b680d

Branch: refs/heads/master
Commit: fd5b680d30546e546ea616d3a4d25de659414f84
Parents: 2b33615
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 13:56:29 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 12 11:45:21 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 115 +++++++++++--------
 .../runtime/io/network/netty/NettyProtocol.java |  32 ++----
 .../netty/NettyMessageSerializationTest.java    |   1 -
 3 files changed, 76 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 9f9ee88..2da4099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -38,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder;
 
 import javax.annotation.Nullable;
 
@@ -47,7 +46,6 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.net.ProtocolException;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -188,58 +186,81 @@ public abstract class NettyMessage {
 				ctx.write(msg, promise);
 			}
 		}
-
-		// Create the frame length decoder here as it depends on the encoder
-		//
-		// +------------------+------------------+--------++----------------+
-		// | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
-		// +------------------+------------------+--------++----------------+
-		static LengthFieldBasedFrameDecoder createFrameLengthDecoder() {
-			return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4);
-		}
 	}
 
-	@ChannelHandler.Sharable
-	static class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+	/**
+	 * Message decoder based on netty's {@link LengthFieldBasedFrameDecoder} but avoiding the
+	 * additional memory copy inside {@link #extractFrame(ChannelHandlerContext, ByteBuf, int, int)}
+	 * since we completely decode the {@link ByteBuf} inside {@link #decode(ChannelHandlerContext,
+	 * ByteBuf)} and will not re-use it afterwards.
+	 *
+	 * <p>The frame-length encoder will be based on this transmission scheme created by {@link NettyMessage#allocateBuffer(ByteBufAllocator, byte, int)}:
+	 * <pre>
+	 * +------------------+------------------+--------++----------------+
+	 * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
+	 * +------------------+------------------+--------++----------------+
+	 * </pre>
+	 */
+	static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
 
-		@Override
-		protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
-			int magicNumber = msg.readInt();
+		/**
+		 * Creates a new message decoded with the required frame properties.
+		 */
+		NettyMessageDecoder() {
+			super(Integer.MAX_VALUE, 0, 4, -4, 4);
+		}
 
-			if (magicNumber != MAGIC_NUMBER) {
-				throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
+		@Override
+		protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+			ByteBuf msg = (ByteBuf) super.decode(ctx, in);
+			if (msg == null) {
+				return null;
 			}
 
-			byte msgId = msg.readByte();
-
-			final NettyMessage decodedMsg;
-			switch (msgId) {
-				case BufferResponse.ID:
-					decodedMsg = BufferResponse.readFrom(msg);
-					break;
-				case PartitionRequest.ID:
-					decodedMsg = PartitionRequest.readFrom(msg);
-					break;
-				case TaskEventRequest.ID:
-					decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
-					break;
-				case ErrorResponse.ID:
-					decodedMsg = ErrorResponse.readFrom(msg);
-					break;
-				case CancelPartitionRequest.ID:
-					decodedMsg = CancelPartitionRequest.readFrom(msg);
-					break;
-				case CloseRequest.ID:
-					decodedMsg = CloseRequest.readFrom(msg);
-					break;
-				case AddCredit.ID:
-					decodedMsg = AddCredit.readFrom(msg);
-					break;
-				default:
-					throw new ProtocolException("Received unknown message from producer: " + msg);
-			}
+			try {
+				int magicNumber = msg.readInt();
+
+				if (magicNumber != MAGIC_NUMBER) {
+					throw new IllegalStateException(
+						"Network stream corrupted: received incorrect magic number.");
+				}
+
+				byte msgId = msg.readByte();
+
+				final NettyMessage decodedMsg;
+				switch (msgId) {
+					case BufferResponse.ID:
+						decodedMsg = BufferResponse.readFrom(msg);
+						break;
+					case PartitionRequest.ID:
+						decodedMsg = PartitionRequest.readFrom(msg);
+						break;
+					case TaskEventRequest.ID:
+						decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
+						break;
+					case ErrorResponse.ID:
+						decodedMsg = ErrorResponse.readFrom(msg);
+						break;
+					case CancelPartitionRequest.ID:
+						decodedMsg = CancelPartitionRequest.readFrom(msg);
+						break;
+					case CloseRequest.ID:
+						decodedMsg = CloseRequest.readFrom(msg);
+						break;
+					case AddCredit.ID:
+						decodedMsg = AddCredit.readFrom(msg);
+						break;
+					default:
+						throw new ProtocolException(
+							"Received unknown message from producer: " + msg);
+				}
 
-			out.add(decodedMsg);
+				return decodedMsg;
+			} finally {
+				// ByteToMessageDecoder cleanup (only the BufferResponse holds on to the decoded
+				// msg but already retain()s the buffer once)
+				msg.release();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index f09ddfb..98ffa93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
-
 /**
  * Defines the server and client channel handlers, i.e. the protocol, used by netty.
  */
@@ -34,8 +32,6 @@ public class NettyProtocol {
 	private final NettyMessage.NettyMessageEncoder
 		messageEncoder = new NettyMessage.NettyMessageEncoder();
 
-	private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder();
-
 	private final ResultPartitionProvider partitionProvider;
 	private final TaskEventDispatcher taskEventDispatcher;
 
@@ -64,14 +60,9 @@ public class NettyProtocol {
 	 * |    +----------+----------+                        |               |
 	 * |              /|\                                  |               |
 	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Message decoder     |                        |               |
-	 * |    +----------+----------+                        |               |
-	 * |              /|\                                  |               |
-	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Frame decoder       |                        |               |
-	 * |    +----------+----------+                        |               |
+	 * |   +-----------+-----------+                       |               |
+	 * |   | Message+Frame decoder |                       |               |
+	 * |   +-----------+-----------+                       |               |
 	 * |              /|\                                  |               |
 	 * +---------------+-----------------------------------+---------------+
 	 * |               | (1) client request               \|/
@@ -92,8 +83,7 @@ public class NettyProtocol {
 
 		return new ChannelHandler[] {
 			messageEncoder,
-			createFrameLengthDecoder(),
-			messageDecoder,
+			new NettyMessage.NettyMessageDecoder(),
 			serverHandler,
 			queueOfPartitionQueues
 		};
@@ -115,14 +105,9 @@ public class NettyProtocol {
 	 * |    +----------+----------+            +-----------+----------+    |
 	 * |              /|\                                 \|/              |
 	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Message decoder     |                        |               |
-	 * |    +----------+----------+                        |               |
-	 * |              /|\                                  |               |
-	 * |               |                                   |               |
-	 * |    +----------+----------+                        |               |
-	 * |    | Frame decoder       |                        |               |
-	 * |    +----------+----------+                        |               |
+	 * |    +----------+------------+                      |               |
+	 * |    | Message+Frame decoder |                      |               |
+	 * |    +----------+------------+                      |               |
 	 * |              /|\                                  |               |
 	 * +---------------+-----------------------------------+---------------+
 	 * |               | (3) server response              \|/ (2) client request
@@ -142,8 +127,7 @@ public class NettyProtocol {
 				new PartitionRequestClientHandler();
 		return new ChannelHandler[] {
 			messageEncoder,
-			createFrameLengthDecoder(),
-			messageDecoder,
+			new NettyMessage.NettyMessageDecoder(),
 			networkClientHandler};
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index ad0f103..cc732cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -46,7 +46,6 @@ public class NettyMessageSerializationTest {
 
 	private final EmbeddedChannel channel = new EmbeddedChannel(
 			new NettyMessage.NettyMessageEncoder(), // outbound messages
-			NettyMessage.NettyMessageEncoder.createFrameLengthDecoder(), // inbound messages
 			new NettyMessage.NettyMessageDecoder()); // inbound messages
 
 	private final Random random = new Random();