You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/06 15:19:24 UTC

[2/4] flink git commit: [FLINK-7515][network] allow actual 0-length content in NettyMessage#allocateBuffer()

[FLINK-7515][network] allow actual 0-length content in NettyMessage#allocateBuffer()

Previously, length "0" meant "unknown content length" but there are cases where
the actual length is 0 and so we use -1 for tagging the special case now.

[FLINK-7515][network] address PR comments

This closes #4592.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1c4eb6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1c4eb6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1c4eb6b

Branch: refs/heads/master
Commit: f1c4eb6b65c4a7c310c325e3dd3f59e89337b3ff
Parents: e285a41
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Aug 24 17:14:38 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Nov 6 16:19:12 2017 +0100

----------------------------------------------------------------------
 .../runtime/io/network/netty/NettyMessage.java  | 46 +++++++++++++++++++-
 1 file changed, 44 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1c4eb6b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index c035010..e73f61d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -47,6 +47,7 @@ import java.net.ProtocolException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -67,12 +68,53 @@ abstract class NettyMessage {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Allocates a new (header and contents) buffer and adds some header information for the frame
+	 * decoder.
+	 *
+	 * <p>Before sending the buffer, you must write the actual length after adding the contents as
+	 * an integer to position <tt>0</tt>!
+	 *
+	 * @param allocator
+	 * 		byte buffer allocator to use
+	 * @param id
+	 * 		{@link NettyMessage} subclass ID
+	 *
+	 * @return a newly allocated direct buffer with header data written for {@link
+	 * NettyMessageDecoder}
+	 */
 	private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id) {
-		return allocateBuffer(allocator, id, 0);
+		return allocateBuffer(allocator, id, -1);
 	}
 
+	/**
+	 * Allocates a new (header and contents) buffer and adds some header information for the frame
+	 * decoder.
+	 *
+	 * <p>If the <tt>length</tt> is unknown, you must write the actual length after adding the
+	 * contents as an integer to position <tt>0</tt>!
+	 *
+	 * @param allocator
+	 * 		byte buffer allocator to use
+	 * @param id
+	 * 		{@link NettyMessage} subclass ID
+	 * @param length
+	 * 		content length (or <tt>-1</tt> if unknown)
+	 *
+	 * @return a newly allocated direct buffer with header data written for {@link
+	 * NettyMessageDecoder}
+	 */
 	private static ByteBuf allocateBuffer(ByteBufAllocator allocator, byte id, int length) {
-		final ByteBuf buffer = length != 0 ? allocator.directBuffer(HEADER_LENGTH + length) : allocator.directBuffer();
+		checkArgument(length <= Integer.MAX_VALUE - HEADER_LENGTH);
+
+		final ByteBuf buffer;
+		if (length != -1) {
+			buffer = allocator.directBuffer(HEADER_LENGTH + length);
+		} else {
+			// content length unknown -> start with the default initial size (rather than HEADER_LENGTH only):
+			buffer = allocator.directBuffer();
+		}
+
 		buffer.writeInt(HEADER_LENGTH + length);
 		buffer.writeInt(MAGIC_NUMBER);
 		buffer.writeByte(id);