You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/12 09:45:34 UTC
[1/2] flink git commit: [FLINK-8759][network] preparations for the
update of netty to version 4.1
Repository: flink
Updated Branches:
refs/heads/master 2b33615b6 -> 7cad1d33a
[FLINK-8759][network] preparations for the update of netty to version 4.1
This closes #5571.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cad1d33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cad1d33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cad1d33
Branch: refs/heads/master
Commit: 7cad1d33a6e82dd1b7255dcb3671c719da43097d
Parents: fd5b680
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 14:06:00 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 12 11:45:21 2018 +0200
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyMessage.java | 31 +++++++++++++++++++-
.../runtime/io/network/netty/NettyProtocol.java | 4 +--
.../netty/NettyMessageSerializationTest.java | 4 ++-
3 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 2da4099..ca0de6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -202,12 +202,18 @@ public abstract class NettyMessage {
* </pre>
*/
static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
+ private final boolean restoreOldNettyBehaviour;
/**
* Creates a new message decoded with the required frame properties.
+ *
+ * @param restoreOldNettyBehaviour
+ * restore Netty 4.0.27 code in {@link LengthFieldBasedFrameDecoder#extractFrame} to
+ * copy instead of slicing the buffer
*/
- NettyMessageDecoder() {
+ NettyMessageDecoder(boolean restoreOldNettyBehaviour) {
super(Integer.MAX_VALUE, 0, 4, -4, 4);
+ this.restoreOldNettyBehaviour = restoreOldNettyBehaviour;
}
@Override
@@ -262,6 +268,29 @@ public abstract class NettyMessage {
msg.release();
}
}
+
+ @Override
+ protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
+ if (restoreOldNettyBehaviour) {
+ /*
+ * For non-credit based code paths with Netty >= 4.0.28.Final:
+ * These versions contain an improvement by Netty, which slices a Netty buffer
+ * instead of doing a memory copy [1] in the
+ * LengthFieldBasedFrameDecoder. In some situations, this
+ * interacts badly with our Netty pipeline leading to OutOfMemory
+ * errors.
+ *
+ * [1] https://github.com/netty/netty/issues/3704
+ *
+ * TODO: remove along with the non-credit based fallback protocol
+ */
+ ByteBuf frame = ctx.alloc().buffer(length);
+ frame.writeBytes(buffer, index, length);
+ return frame;
+ } else {
+ return super.extractFrame(ctx, buffer, index, length);
+ }
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index 98ffa93..ebad11b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -83,7 +83,7 @@ public class NettyProtocol {
return new ChannelHandler[] {
messageEncoder,
- new NettyMessage.NettyMessageDecoder(),
+ new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
serverHandler,
queueOfPartitionQueues
};
@@ -127,7 +127,7 @@ public class NettyProtocol {
new PartitionRequestClientHandler();
return new ChannelHandler[] {
messageEncoder,
- new NettyMessage.NettyMessageDecoder(),
+ new NettyMessage.NettyMessageDecoder(!creditBasedEnabled),
networkClientHandler};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cad1d33/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index cc732cd..ebdb529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -44,9 +44,11 @@ import static org.junit.Assert.assertTrue;
*/
public class NettyMessageSerializationTest {
+ public static final boolean RESTORE_OLD_NETTY_BEHAVIOUR = false;
+
private final EmbeddedChannel channel = new EmbeddedChannel(
new NettyMessage.NettyMessageEncoder(), // outbound messages
- new NettyMessage.NettyMessageDecoder()); // inbound messages
+ new NettyMessage.NettyMessageDecoder(RESTORE_OLD_NETTY_BEHAVIOUR)); // inbound messages
private final Random random = new Random();
[2/2] flink git commit: [FLINK-8768][network] Let NettyMessageDecoder
inherit from LengthFieldBasedFrameDecoder
Posted by tr...@apache.org.
[FLINK-8768][network] Let NettyMessageDecoder inherit from LengthFieldBasedFrameDecoder
This replaces one additional step from the pipeline and does not only remove
overhead there but also allows use to override the #extractFrame() method to
restore the old Netty 4.0.27 behaviour for non-credit based code paths which
had a bug with Netty >= 4.0.28 there (see FLINK-8759).
This closes #5570.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd5b680d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd5b680d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd5b680d
Branch: refs/heads/master
Commit: fd5b680d30546e546ea616d3a4d25de659414f84
Parents: 2b33615
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 13:56:29 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 12 11:45:21 2018 +0200
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyMessage.java | 115 +++++++++++--------
.../runtime/io/network/netty/NettyProtocol.java | 32 ++----
.../netty/NettyMessageSerializationTest.java | 1 -
3 files changed, 76 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 9f9ee88..2da4099 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -38,7 +38,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder;
import javax.annotation.Nullable;
@@ -47,7 +46,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
-import java.util.List;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -188,58 +186,81 @@ public abstract class NettyMessage {
ctx.write(msg, promise);
}
}
-
- // Create the frame length decoder here as it depends on the encoder
- //
- // +------------------+------------------+--------++----------------+
- // | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
- // +------------------+------------------+--------++----------------+
- static LengthFieldBasedFrameDecoder createFrameLengthDecoder() {
- return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4);
- }
}
- @ChannelHandler.Sharable
- static class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
+ /**
+ * Message decoder based on netty's {@link LengthFieldBasedFrameDecoder} but avoiding the
+ * additional memory copy inside {@link #extractFrame(ChannelHandlerContext, ByteBuf, int, int)}
+ * since we completely decode the {@link ByteBuf} inside {@link #decode(ChannelHandlerContext,
+ * ByteBuf)} and will not re-use it afterwards.
+ *
+ * <p>The frame-length encoder will be based on this transmission scheme created by {@link NettyMessage#allocateBuffer(ByteBufAllocator, byte, int)}:
+ * <pre>
+ * +------------------+------------------+--------++----------------+
+ * | FRAME LENGTH (4) | MAGIC NUMBER (4) | ID (1) || CUSTOM MESSAGE |
+ * +------------------+------------------+--------++----------------+
+ * </pre>
+ */
+ static class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
- int magicNumber = msg.readInt();
+ /**
+ * Creates a new message decoded with the required frame properties.
+ */
+ NettyMessageDecoder() {
+ super(Integer.MAX_VALUE, 0, 4, -4, 4);
+ }
- if (magicNumber != MAGIC_NUMBER) {
- throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ ByteBuf msg = (ByteBuf) super.decode(ctx, in);
+ if (msg == null) {
+ return null;
}
- byte msgId = msg.readByte();
-
- final NettyMessage decodedMsg;
- switch (msgId) {
- case BufferResponse.ID:
- decodedMsg = BufferResponse.readFrom(msg);
- break;
- case PartitionRequest.ID:
- decodedMsg = PartitionRequest.readFrom(msg);
- break;
- case TaskEventRequest.ID:
- decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
- break;
- case ErrorResponse.ID:
- decodedMsg = ErrorResponse.readFrom(msg);
- break;
- case CancelPartitionRequest.ID:
- decodedMsg = CancelPartitionRequest.readFrom(msg);
- break;
- case CloseRequest.ID:
- decodedMsg = CloseRequest.readFrom(msg);
- break;
- case AddCredit.ID:
- decodedMsg = AddCredit.readFrom(msg);
- break;
- default:
- throw new ProtocolException("Received unknown message from producer: " + msg);
- }
+ try {
+ int magicNumber = msg.readInt();
+
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IllegalStateException(
+ "Network stream corrupted: received incorrect magic number.");
+ }
+
+ byte msgId = msg.readByte();
+
+ final NettyMessage decodedMsg;
+ switch (msgId) {
+ case BufferResponse.ID:
+ decodedMsg = BufferResponse.readFrom(msg);
+ break;
+ case PartitionRequest.ID:
+ decodedMsg = PartitionRequest.readFrom(msg);
+ break;
+ case TaskEventRequest.ID:
+ decodedMsg = TaskEventRequest.readFrom(msg, getClass().getClassLoader());
+ break;
+ case ErrorResponse.ID:
+ decodedMsg = ErrorResponse.readFrom(msg);
+ break;
+ case CancelPartitionRequest.ID:
+ decodedMsg = CancelPartitionRequest.readFrom(msg);
+ break;
+ case CloseRequest.ID:
+ decodedMsg = CloseRequest.readFrom(msg);
+ break;
+ case AddCredit.ID:
+ decodedMsg = AddCredit.readFrom(msg);
+ break;
+ default:
+ throw new ProtocolException(
+ "Received unknown message from producer: " + msg);
+ }
- out.add(decodedMsg);
+ return decodedMsg;
+ } finally {
+ // ByteToMessageDecoder cleanup (only the BufferResponse holds on to the decoded
+ // msg but already retain()s the buffer once)
+ msg.release();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
index f09ddfb..98ffa93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
@@ -24,8 +24,6 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import static org.apache.flink.runtime.io.network.netty.NettyMessage.NettyMessageEncoder.createFrameLengthDecoder;
-
/**
* Defines the server and client channel handlers, i.e. the protocol, used by netty.
*/
@@ -34,8 +32,6 @@ public class NettyProtocol {
private final NettyMessage.NettyMessageEncoder
messageEncoder = new NettyMessage.NettyMessageEncoder();
- private final NettyMessage.NettyMessageDecoder messageDecoder = new NettyMessage.NettyMessageDecoder();
-
private final ResultPartitionProvider partitionProvider;
private final TaskEventDispatcher taskEventDispatcher;
@@ -64,14 +60,9 @@ public class NettyProtocol {
* | +----------+----------+ | |
* | /|\ | |
* | | | |
- * | +----------+----------+ | |
- * | | Message decoder | | |
- * | +----------+----------+ | |
- * | /|\ | |
- * | | | |
- * | +----------+----------+ | |
- * | | Frame decoder | | |
- * | +----------+----------+ | |
+ * | +-----------+-----------+ | |
+ * | | Message+Frame decoder | | |
+ * | +-----------+-----------+ | |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | | (1) client request \|/
@@ -92,8 +83,7 @@ public class NettyProtocol {
return new ChannelHandler[] {
messageEncoder,
- createFrameLengthDecoder(),
- messageDecoder,
+ new NettyMessage.NettyMessageDecoder(),
serverHandler,
queueOfPartitionQueues
};
@@ -115,14 +105,9 @@ public class NettyProtocol {
* | +----------+----------+ +-----------+----------+ |
* | /|\ \|/ |
* | | | |
- * | +----------+----------+ | |
- * | | Message decoder | | |
- * | +----------+----------+ | |
- * | /|\ | |
- * | | | |
- * | +----------+----------+ | |
- * | | Frame decoder | | |
- * | +----------+----------+ | |
+ * | +----------+------------+ | |
+ * | | Message+Frame decoder | | |
+ * | +----------+------------+ | |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | | (3) server response \|/ (2) client request
@@ -142,8 +127,7 @@ public class NettyProtocol {
new PartitionRequestClientHandler();
return new ChannelHandler[] {
messageEncoder,
- createFrameLengthDecoder(),
- messageDecoder,
+ new NettyMessage.NettyMessageDecoder(),
networkClientHandler};
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd5b680d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index ad0f103..cc732cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -46,7 +46,6 @@ public class NettyMessageSerializationTest {
private final EmbeddedChannel channel = new EmbeddedChannel(
new NettyMessage.NettyMessageEncoder(), // outbound messages
- NettyMessage.NettyMessageEncoder.createFrameLengthDecoder(), // inbound messages
new NettyMessage.NettyMessageDecoder()); // inbound messages
private final Random random = new Random();