You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:26:00 UTC

[9/9] flink git commit: [hotfix][network] clarify BufferResponse#size() uses (by removing it)

[hotfix][network] clarify BufferResponse#size() uses (by removing it)

This field was only used by the code paths on the receiver and was inconsistent
with what was added on the sending side. We should use the contained buffer's
readableBytes() instead, depending on the actual use case.

This closes #4613.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d0dfcba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d0dfcba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d0dfcba

Branch: refs/heads/master
Commit: 9d0dfcba639a206fb7bf06df3b6af48719794d5d
Parents: 4359301
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Jan 11 12:03:45 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:24:57 2018 +0100

----------------------------------------------------------------------
 .../io/network/netty/CreditBasedClientHandler.java       |  8 +++++---
 .../flink/runtime/io/network/netty/NettyMessage.java     |  8 --------
 .../io/network/netty/PartitionRequestClientHandler.java  | 11 +++++++----
 3 files changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d0dfcba/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
index dbddaa7..fdcc179 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
@@ -292,7 +293,8 @@ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
 	private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
 		try {
-			int size = bufferOrEvent.getSize();
+			ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
+			final int size = nettyBuffer.readableBytes();
 			if (bufferOrEvent.isBuffer()) {
 				// ---- Buffer ------------------------------------------------
 
@@ -305,7 +307,7 @@ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
 				NetworkBuffer buffer = (NetworkBuffer) inputChannel.requestBuffer();
 				if (buffer != null) {
-					bufferOrEvent.getNettyBuffer().readBytes(buffer, size);
+					nettyBuffer.readBytes(buffer, size);
 
 					inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 				} else if (inputChannel.isReleased()) {
@@ -317,7 +319,7 @@ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 				// ---- Event -------------------------------------------------
 				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
 				byte[] byteArray = new byte[size];
-				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
+				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
 				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0dfcba/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index db3a925..5160853 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -230,8 +230,6 @@ public abstract class NettyMessage {
 
 		final boolean isBuffer;
 
-		final int size;
-
 		@Nullable
 		ByteBuf retainedSlice;
 
@@ -244,7 +242,6 @@ public abstract class NettyMessage {
 			// retainedSlice is set in this case.
 			this.buffer = null;
 			this.retainedSlice = checkNotNull(retainedSlice);
-			this.size = retainedSlice.writerIndex();
 			this.isBuffer = isBuffer;
 			this.sequenceNumber = sequenceNumber;
 			this.receiverId = checkNotNull(receiverId);
@@ -255,7 +252,6 @@ public abstract class NettyMessage {
 			this.buffer = checkNotNull(buffer);
 			this.retainedSlice = null;
 			this.isBuffer = buffer.isBuffer();
-			this.size = buffer.getMaxCapacity();
 			this.sequenceNumber = sequenceNumber;
 			this.receiverId = checkNotNull(receiverId);
 			this.backlog = backlog;
@@ -265,10 +261,6 @@ public abstract class NettyMessage {
 			return isBuffer;
 		}
 
-		int getSize() {
-			return size;
-		}
-
 		ByteBuf getNettyBuffer() {
 			return retainedSlice;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d0dfcba/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index c7b20de..8e4d8cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
@@ -271,7 +272,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 		boolean releaseNettyBuffer = true;
 
 		try {
-			final int receivedSize = bufferOrEvent.getSize();
+			ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
+			final int receivedSize = nettyBuffer.readableBytes();
 
 			if (bufferOrEvent.isBuffer()) {
 				// ---- Buffer ------------------------------------------------
@@ -295,7 +297,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 					NetworkBuffer buffer = (NetworkBuffer) bufferProvider.requestBuffer();
 
 					if (buffer != null) {
-						bufferOrEvent.getNettyBuffer().readBytes(buffer, receivedSize);
+						nettyBuffer.readBytes(buffer, receivedSize);
 
 						inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
 
@@ -315,7 +317,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				// ---- Event -------------------------------------------------
 				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
 				byte[] byteArray = new byte[receivedSize];
-				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
+				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
 				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
@@ -452,7 +454,8 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 					throw new IllegalStateException("Running buffer availability task w/o a buffer.");
 				}
 
-				stagedBufferResponse.getNettyBuffer().readBytes(buffer, stagedBufferResponse.getSize());
+				ByteBuf nettyBuffer = stagedBufferResponse.getNettyBuffer();
+				nettyBuffer.readBytes(buffer, nettyBuffer.readableBytes());
 				stagedBufferResponse.releaseBuffer();
 
 				RemoteInputChannel inputChannel = inputChannels.get(stagedBufferResponse.receiverId);