You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 10:28:20 UTC
[flink] 03/09: [hotfix][network] some minor improvements around the
network stack
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 26f4355fc30c0e8e8e186b0f7148aea79d5446bc
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:23:33 2018 +0200
[hotfix][network] some minor improvements around the network stack
---
.../api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 4 ++--
.../org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java | 2 +-
.../io/network/netty/CreditBasedPartitionRequestClientHandler.java | 3 +--
.../flink/runtime/io/network/netty/PartitionRequestClientHandler.java | 3 +--
4 files changed, 5 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 196287b..8630ace 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -502,8 +502,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
int segmentRemaining = numBytes;
// check where to go. if we have a partial length, we need to complete it first
if (this.lengthBuffer.position() > 0) {
- int toPut = Math.min(this.lengthBuffer.remaining(), numBytes);
- segment.get(offset, this.lengthBuffer, toPut);
+ int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+ segment.get(segmentPosition, this.lengthBuffer, toPut);
// did we complete the length?
if (this.lengthBuffer.hasRemaining()) {
return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d..a5bf30f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -92,7 +92,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
/**
* Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
- * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+ * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
*
* @param memorySegment
* backing memory segment (defines {@link #maxCapacity})
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920..90daf75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
nettyBuffer.readBytes(byteArray);
MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
- Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
- buffer.setSize(receivedSize);
+ Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d..796e86f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme
nettyBuffer.readBytes(byteArray);
MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
- Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
- buffer.setSize(receivedSize);
+ Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);
inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);