You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/21 17:41:11 UTC

[flink] 01/04: [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2da9ac4b2108ec13ac97a8aff90c43449b2efb4e
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Wed May 20 15:17:24 2020 +0200

    [FLINK-17842][network] Fix performance regression in SpanningWrapper#clear
    
    For some reason the following commit:
    54155744bd [FLINK-17547][task] Use RefCountedFile in SpanningWrapper
    
    caused a performance regression in various benchmarks. It's hard to tell why
    as none of the benchmarks are using spill files (records are too small), so
    our best guess is that combination of AtomicInteger inside RefCountedFile
    plus NullPointerException handling messed up with JIT ability to get rid
    of the memory barrier (from AtomicInteger) on the hot path.
---
 .../io/network/api/serialization/SpanningWrapper.java      | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 45d6ad7..7e3a2da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -50,7 +50,7 @@ import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningW
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.apache.flink.util.CloseableIterator.empty;
 import static org.apache.flink.util.FileUtils.writeCompletely;
-import static org.apache.flink.util.IOUtils.closeAllQuietly;
+import static org.apache.flink.util.IOUtils.closeQuietly;
 
 final class SpanningWrapper {
 
@@ -249,7 +249,17 @@ final class SpanningWrapper {
 		leftOverLimit = 0;
 		accumulatedRecordBytes = 0;
 
-		closeAllQuietly(spillingChannel, spillFileReader, () -> spillFile.release());
+		if (spillingChannel != null) {
+			closeQuietly(spillingChannel);
+		}
+		if (spillFileReader != null) {
+			closeQuietly(spillFileReader);
+		}
+		if (spillFile != null) {
+			// It's important to avoid AtomicInteger access inside `release()` on the hot path
+			closeQuietly(() -> spillFile.release());
+		}
+
 		spillingChannel = null;
 		spillFileReader = null;
 		spillFile = null;