You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:47 UTC

[flink] 02/11: [hotfix][network] ensure deserialization buffer capacity for the whole record length

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbb8b0a10f4f48c47ffbeb325c7e3ce0203a552f
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Thu Sep 13 12:14:05 2018 +0200

    [hotfix][network] ensure deserialization buffer capacity for the whole record length
    
    Once we know the record length and if we are not spilling, we should size the
    buffer immediately to the expected record size, and not incrementally for each
    received buffer chunk.
---
 .../serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03d..196287b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 			}
 			else {
 				// collect in memory
-				ensureBufferCapacity(numBytesChunk);
+				ensureBufferCapacity(nextRecordLength);
 				partial.segment.get(partial.position, buffer, 0, numBytesChunk);
 			}
 
@@ -515,6 +515,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 					segmentRemaining -= toPut;
 					if (this.recordLength > THRESHOLD_FOR_SPILLING) {
 						this.spillingChannel = createSpillingChannel();
+					} else {
+						ensureBufferCapacity(this.recordLength);
 					}
 				}
 			}
@@ -527,9 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit
 				// spill to file
 				ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
 				this.spillingChannel.write(toWrite);
-			}
-			else {
-				ensureBufferCapacity(accumulatedRecordBytes + toCopy);
+			} else {
 				segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy);
 			}