You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/31 16:28:58 UTC

[flink] 02/02: [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 761a4623ddac2b24a96d00fef33c32bcea29c92f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jan 31 13:05:33 2022 +0100

    [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper
---
 .../runtime/io/StreamMultipleInputProcessor.java   | 91 ++++++++++------------
 1 file changed, 40 insertions(+), 51 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index cb91d8e..ba7e46e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.streaming.api.operators.InputSelection;
@@ -28,7 +27,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+
+import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
 
 /** Input processor for {@link MultipleInputStreamOperator}. */
 @Internal
@@ -49,14 +49,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
-        this.availabilityHelper =
-                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
-    }
-
-    @Override
-    public boolean isAvailable() {
-        return inputSelectionHandler.isAnyInputAvailable()
-                || inputSelectionHandler.areAllInputsFinished();
+        this.availabilityHelper = new MultipleInputAvailabilityHelper(inputProcessors.length);
     }
 
     @Override
@@ -65,11 +58,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        /*
-         * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
-         * has some memory issue. This issue is fixed in jdk9.
-         * https://bugs.openjdk.java.net/browse/JDK-8160402
-         */
+
         availabilityHelper.resetToUnAvailable();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
@@ -170,57 +159,57 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
         }
     }
 
-    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
-    @VisibleForTesting
-    public static class MultipleInputAvailabilityHelper {
-        private final CompletableFuture<?>[] cachedAvailableFutures;
-        private final Consumer[] onCompletion;
-        private volatile CompletableFuture<?> availableFuture;
-
+    /**
+     * This class is semi-thread safe. Only method {@link #notifyCompletion()} is allowed to be
+     * executed from an outside of the task thread.
+     *
+     * <p>It solves a problem of a potential memory leak as described in FLINK-25728. In short we
+     * have to ensure, that if there is one input (future) that rarely (or never) completes, that
+     * such future would not prevent previously returned combined futures (like {@link
+     * CompletableFuture#anyOf(CompletableFuture[])} from being garbage collected. Additionally, we
+     * don't want to accumulate more and more completion stages on such rarely completed future, so
+     * we are registering {@link CompletableFuture#thenRun(Runnable)} only if it has not already
+     * been done.
+     *
+     * <p>Note {@link #resetToUnAvailable()} doesn't de register previously registered futures. If
+     * future was registered in the past, but for whatever reason now it is not, such future can
+     * still complete the newly created future.
+     *
+     * <p>It might be no longer needed after upgrading to JDK9
+     * (https://bugs.openjdk.java.net/browse/JDK-8160402).
+     */
+    private static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] futuresToCombine;
+
+        private volatile CompletableFuture<?> availableFuture = new CompletableFuture<>();
+
+        public MultipleInputAvailabilityHelper(int inputSize) {
+            futuresToCombine = new CompletableFuture[inputSize];
+        }
+
+        /** @return combined future using anyOf logic */
         public CompletableFuture<?> getAvailableFuture() {
             return availableFuture;
         }
 
-        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
-            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
-            return obj;
-        }
-
-        private MultipleInputAvailabilityHelper(int inputSize) {
-            this.cachedAvailableFutures = new CompletableFuture[inputSize];
-            this.onCompletion = new Consumer[inputSize];
-            availableFuture = new CompletableFuture<>();
-            for (int i = 0; i < cachedAvailableFutures.length; i++) {
-                final int inputIdx = i;
-                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
-            }
-        }
-
-        /** Reset availableFuture to fresh unavailable. */
         public void resetToUnAvailable() {
             if (availableFuture.isDone()) {
                 availableFuture = new CompletableFuture<>();
             }
         }
 
-        private void notifyCompletion(int idx) {
+        private void notifyCompletion() {
             availableFuture.complete(null);
-            cachedAvailableFutures[idx] = AVAILABLE;
         }
 
         /**
-         * Implement `Or` logic.
-         *
-         * @param idx
-         * @param dep
+         * Combine {@code availabilityFuture} using anyOf logic with other previously registered
+         * futures.
          */
-        public void anyOf(final int idx, CompletableFuture<?> dep) {
-            if (dep == AVAILABLE || dep.isDone()) {
-                cachedAvailableFutures[idx] = dep;
-                availableFuture.complete(null);
-            } else if (dep != cachedAvailableFutures[idx]) {
-                cachedAvailableFutures[idx] = dep;
-                dep.thenAccept(onCompletion[idx]);
+        public void anyOf(final int idx, CompletableFuture<?> availabilityFuture) {
+            if (futuresToCombine[idx] == null || futuresToCombine[idx].isDone()) {
+                futuresToCombine[idx] = availabilityFuture;
+                assertNoException(availabilityFuture.thenRun(this::notifyCompletion));
             }
         }
     }