You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/06 15:19:24 UTC
[2/4] flink git commit: [FLINK-7515][network] allow actual 0-length
content in NettyMessage#allocateBuffer()
[FLINK-7515][network] allow actual 0-length content in NettyMessage#allocateBuffer()
Previously, length "0" meant "unknown content length" but there are cases where
the actual length is 0 and so we use -1 for tagging the special case now.
[FLINK-7515][network] address PR comments
This closes #4592.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1c4eb6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1c4eb6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1c4eb6b
Branch: refs/heads/master
Commit: f1c4eb6b65c4a7c310c325e3dd3f59e89337b3ff
Parents: e285a41
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 24 17:14:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 6 16:19:12 2017 +0100
----------------------------------------------------------------------
.../runtime/io/network/netty/NettyMessage.java | 46 +++++++++++++++++++-
1 file changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f1c4eb6b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index c035010..e73f61d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -47,6 +47,7 @@ import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -67,12 +68,53 @@ abstract class NettyMessage {
// ------------------------------------------------------------------------
+ /**
+ * Allocates a new (header and contents) buffer and adds some header information for the frame
+ * decoder.
+ *
+ * <p>Before sending the buffer, you must write the actual length after adding the contents as
+ * an integer to position <tt>0</tt>!
+ *
+ * @param allocator
+ * byte buffer allocator to use
+ * @param id
+ * {@link NettyMessage} subclass ID
+ *
+ * @return a newly allocated direct buffer with header data written for {@link
+ * NettyMessageDecoder}
+ */
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
- return allocateBuffer(allocator, id, 0);
+ return allocateBuffer(allocator, id, -1);
}
+ /**
+ * Allocates a new (header and contents) buffer and adds some header information for the frame
+ * decoder.
+ *
+ * <p>If the <tt>length</tt> is unknown, you must write the actual length after adding the
+ * contents as an integer to position <tt>0</tt>!
+ *
+ * @param allocator
+ * byte buffer allocator to use
+ * @param id
+ * {@link NettyMessage} subclass ID
+ * @param length
+ * content length (or <tt>-1</tt> if unknown)
+ *
+ * @return a newly allocated direct buffer with header data written for {@link
+ * NettyMessageDecoder}
+ */
private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) {
- final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+ checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH);
+
+ final ByteBuf buffer;
+ if (length != -1) {
+ buffer = allocator.directBuffer(HEADER_LENGTH + length);
+ } else {
+ // content length unknown -> start with the default initial size (rather than HEADER_LENGTH only):
+ buffer = allocator.directBuffer();
+ }
+
buffer.writeInt(HEADER_LENGTH + length);
buffer.writeInt(MAGIC_NUMBER);
buffer.writeByte(id);