You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:17 UTC

[flink] 01/09: [FLINK-19023][network] Remove unnecessary buffer pruning in RecordSerializer

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c273f86e41866bc737de1686aa0925a4749671b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Aug 21 17:16:11 2020 +0200

    [FLINK-19023][network] Remove unnecessary buffer pruning in RecordSerializer
    
    This removes the behavior of the SpanningRecordSerializer to prune its internal serialization buffer
    under special circumstances.
    
    Previously, the buffer was pruned when:
      - The buffer becomes larger than a certain threshold (5MB)
      - The full record end lines up exactly with a full buffer length (this change got introduced
        at some point, it is not clear what the purpose is)
    
    This optimization virtually never kicks in (because of the second condition) and also is unnecessary.
    There is only a single serializer on the sender side, so this will not help to reduce the maximum memory
    footprint needed in any way.
---
 .../flink/core/memory/DataOutputSerializer.java    | 26 +---------------------
 .../api/serialization/RecordSerializer.java        |  7 ------
 .../serialization/SpanningRecordSerializer.java    |  6 -----
 .../api/writer/ChannelSelectorRecordWriter.java    |  9 +-------
 .../io/network/api/writer/RecordWriter.java        |  4 +---
 5 files changed, 3 insertions(+), 49 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index 1255fc3..bae5a32 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -20,9 +20,6 @@ package org.apache.flink.core.memory;
 
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
@@ -35,14 +32,6 @@ import java.util.Arrays;
  */
 public class DataOutputSerializer implements DataOutputView, MemorySegmentWritable {
 
-	private static final Logger LOG = LoggerFactory.getLogger(DataOutputSerializer.class);
-
-	private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
-
-	// ------------------------------------------------------------------------
-
-	private final byte[] startBuffer;
-
 	private byte[] buffer;
 
 	private int position;
@@ -56,8 +45,7 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
 			throw new IllegalArgumentException();
 		}
 
-		this.startBuffer = new byte[startSize];
-		this.buffer = this.startBuffer;
+		this.buffer = new byte[startSize];
 		this.wrapper = ByteBuffer.wrap(buffer);
 	}
 
@@ -109,18 +97,6 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
 		return this.position;
 	}
 
-	public void pruneBuffer() {
-		clear();
-		if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Releasing serialization buffer of " + this.buffer.length + " bytes.");
-			}
-
-			this.buffer = this.startBuffer;
-			this.wrapper = ByteBuffer.wrap(this.buffer);
-		}
-	}
-
 	@Override
 	public String toString() {
 		return String.format("[pos=%d cap=%d]", this.position, this.buffer.length);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index c0cf35d..6f73917 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -82,13 +82,6 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
 
 	/**
-	 * Clears the buffer and checks to decrease the size of intermediate data serialization buffer
-	 * after finishing the whole serialization process including
-	 * {@link #serializeRecord(IOReadableWritable)} and {@link #copyToBufferBuilder(BufferBuilder)}.
-	 */
-	void prune();
-
-	/**
 	 * Supports copying an intermediate data serialization buffer to multiple target buffers
 	 * by resetting its initial position before each copying.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index d6da1ad..1bb9ec8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -108,12 +108,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	@Override
-	public void prune() {
-		serializationBuffer.pruneBuffer();
-		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-	}
-
-	@Override
 	public boolean hasSerializedData() {
 		return dataBuffer.hasRemaining();
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 298e357..2e14988 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -75,15 +75,8 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 
 		serializer.serializeRecord(record);
 
-		boolean pruneAfterCopying = false;
 		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-			if (copyFromSerializerToTargetChannel(targetChannel)) {
-				pruneAfterCopying = true;
-			}
-		}
-
-		if (pruneAfterCopying) {
-			serializer.prune();
+			copyFromSerializerToTargetChannel(targetChannel);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 49c79bc..fe171ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -120,9 +120,7 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		serializer.serializeRecord(record);
 
 		// Make sure we don't hold onto the large intermediate serialization buffer for too long
-		if (copyFromSerializerToTargetChannel(targetChannel)) {
-			serializer.prune();
-		}
+		copyFromSerializerToTargetChannel(targetChannel);
 	}
 
 	/**