You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:48 UTC

[flink] 03/11: [hotfix][network] some minor improvements around the network stack

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 213e085cc75174e458a35e669059aa142e971d6b
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:23:33 2018 +0200

    [hotfix][network] some minor improvements around the network stack
---
 .../api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 4 ++--
 .../org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java     | 2 +-
 .../io/network/netty/CreditBasedPartitionRequestClientHandler.java    | 3 +--
 .../flink/runtime/io/network/netty/PartitionRequestClientHandler.java | 3 +--
 4 files changed, 5 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 196287b..8630ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -502,8 +502,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			int segmentRemaining = numBytes;
 			// check where to go. if we have a partial length, we need to complete it first
 			if (this.lengthBuffer.position() > 0) {
-				int toPut = Math.min(this.lengthBuffer.remaining(), numBytes);
-				segment.get(offset, this.lengthBuffer, toPut);
+				int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+				segment.get(segmentPosition, this.lengthBuffer, toPut);
 				// did we complete the length?
 				if (this.lengthBuffer.hasRemaining()) {
 					return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 489be39..05b7582 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -93,7 +93,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 
 	/**
 	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
-	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
 	 *
 	 * @param memorySegment
 	 * 		backing memory segment (defines {@link #maxCapacity})
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920..90daf75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d..796e86f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
 				nettyBuffer.readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
-				buffer.setSize(receivedSize);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);