You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/31 16:28:57 UTC

[flink] 01/02: [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f4de732da43d320167e2692d68bbb9692c62cdb
Author: wangpengcheng <wp...@gmail.com>
AuthorDate: Fri Jan 21 13:41:57 2022 +0800

    [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.
---
 .../runtime/io/StreamMultipleInputProcessor.java   | 83 +++++++++++++++++++---
 1 file changed, 75 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index baacf26..cb91d8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.streaming.api.operators.InputSelection;
@@ -27,8 +28,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
+import java.util.function.Consumer;
 
 /** Input processor for {@link MultipleInputStreamOperator}. */
 @Internal
@@ -37,6 +37,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
     private final MultipleInputSelectionHandler inputSelectionHandler;
 
     private final StreamOneInputProcessor<?>[] inputProcessors;
+
+    private final MultipleInputAvailabilityHelper availabilityHelper;
     /** Always try to read from the first input. */
     private int lastReadInputIndex = 1;
 
@@ -47,6 +49,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished();
     }
 
     @Override
@@ -55,17 +65,19 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
+        /*
+         * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
+         * has some memory issue. This issue is fixed in jdk9.
+         * https://bugs.openjdk.java.net/browse/JDK-8160402
+         */
+        availabilityHelper.resetToUnAvailable();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
                     && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                availabilityHelper.anyOf(i, inputProcessors[i].getAvailableFuture());
             }
         }
-        return anyInputAvailable;
+        return availabilityHelper.getAvailableFuture();
     }
 
     @Override
@@ -157,4 +169,59 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private volatile CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+            availableFuture = new CompletableFuture<>();
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        /** Reset availableFuture to fresh unavailable. */
+        public void resetToUnAvailable() {
+            if (availableFuture.isDone()) {
+                availableFuture = new CompletableFuture<>();
+            }
+        }
+
+        private void notifyCompletion(int idx) {
+            availableFuture.complete(null);
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }
+
+        /**
+         * Implement `Or` logic.
+         *
+         * @param idx
+         * @param dep
+         */
+        public void anyOf(final int idx, CompletableFuture<?> dep) {
+            if (dep == AVAILABLE || dep.isDone()) {
+                cachedAvailableFutures[idx] = dep;
+                availableFuture.complete(null);
+            } else if (dep != cachedAvailableFutures[idx]) {
+                cachedAvailableFutures[idx] = dep;
+                dep.thenAccept(onCompletion[idx]);
+            }
+        }
+    }
 }