You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/31 16:36:45 UTC

[flink] branch release-1.13 updated (fe5a171 -> 9653999)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fe5a171  [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes
     new 41dec90  [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.
     new 9653999  [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/StreamMultipleInputProcessor.java   | 68 ++++++++++++++++++++--
 1 file changed, 62 insertions(+), 6 deletions(-)

[flink] 02/02: [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9653999a27b2924fa9d362413e2a521be316c86d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jan 31 13:05:33 2022 +0100

    [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper
---
 .../runtime/io/StreamMultipleInputProcessor.java   | 91 ++++++++++------------
 1 file changed, 40 insertions(+), 51 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index db9b372..c290813 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
@@ -29,7 +28,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+
+import static org.apache.flink.runtime.concurrent.FutureUtils.assertNoException;
 
 /** Input processor for {@link MultipleInputStreamOperator}. */
 @Internal
@@ -50,14 +50,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
-        this.availabilityHelper =
-                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
-    }
-
-    @Override
-    public boolean isAvailable() {
-        return inputSelectionHandler.isAnyInputAvailable()
-                || inputSelectionHandler.areAllInputsFinished();
+        this.availabilityHelper = new MultipleInputAvailabilityHelper(inputProcessors.length);
     }
 
     @Override
@@ -66,11 +59,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        /*
-         * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
-         * has some memory issue. This issue is fixed in jdk9.
-         * https://bugs.openjdk.java.net/browse/JDK-8160402
-         */
+
         availabilityHelper.resetToUnAvailable();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
@@ -172,57 +161,57 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
         }
     }
 
-    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
-    @VisibleForTesting
-    public static class MultipleInputAvailabilityHelper {
-        private final CompletableFuture<?>[] cachedAvailableFutures;
-        private final Consumer[] onCompletion;
-        private volatile CompletableFuture<?> availableFuture;
-
+    /**
+     * This class is semi-thread safe. Only method {@link #notifyCompletion()} is allowed to be
+     * executed from an outside of the task thread.
+     *
+     * <p>It solves a problem of a potential memory leak as described in FLINK-25728. In short we
+     * have to ensure, that if there is one input (future) that rarely (or never) completes, that
+     * such future would not prevent previously returned combined futures (like {@link
+     * CompletableFuture#anyOf(CompletableFuture[])} from being garbage collected. Additionally, we
+     * don't want to accumulate more and more completion stages on such rarely completed future, so
+     * we are registering {@link CompletableFuture#thenRun(Runnable)} only if it has not already
+     * been done.
+     *
+     * <p>Note {@link #resetToUnAvailable()} doesn't de register previously registered futures. If
+     * future was registered in the past, but for whatever reason now it is not, such future can
+     * still complete the newly created future.
+     *
+     * <p>It might be no longer needed after upgrading to JDK9
+     * (https://bugs.openjdk.java.net/browse/JDK-8160402).
+     */
+    private static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] futuresToCombine;
+
+        private volatile CompletableFuture<?> availableFuture = new CompletableFuture<>();
+
+        public MultipleInputAvailabilityHelper(int inputSize) {
+            futuresToCombine = new CompletableFuture[inputSize];
+        }
+
+        /** @return combined future using anyOf logic */
         public CompletableFuture<?> getAvailableFuture() {
             return availableFuture;
         }
 
-        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
-            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
-            return obj;
-        }
-
-        private MultipleInputAvailabilityHelper(int inputSize) {
-            this.cachedAvailableFutures = new CompletableFuture[inputSize];
-            this.onCompletion = new Consumer[inputSize];
-            availableFuture = new CompletableFuture<>();
-            for (int i = 0; i < cachedAvailableFutures.length; i++) {
-                final int inputIdx = i;
-                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
-            }
-        }
-
-        /** Reset availableFuture to fresh unavailable. */
         public void resetToUnAvailable() {
             if (availableFuture.isDone()) {
                 availableFuture = new CompletableFuture<>();
             }
         }
 
-        private void notifyCompletion(int idx) {
+        private void notifyCompletion() {
             availableFuture.complete(null);
-            cachedAvailableFutures[idx] = AVAILABLE;
         }
 
         /**
-         * Implement `Or` logic.
-         *
-         * @param idx
-         * @param dep
+         * Combine {@code availabilityFuture} using anyOf logic with other previously registered
+         * futures.
          */
-        public void anyOf(final int idx, CompletableFuture<?> dep) {
-            if (dep == AVAILABLE || dep.isDone()) {
-                cachedAvailableFutures[idx] = dep;
-                availableFuture.complete(null);
-            } else if (dep != cachedAvailableFutures[idx]) {
-                cachedAvailableFutures[idx] = dep;
-                dep.thenAccept(onCompletion[idx]);
+        public void anyOf(final int idx, CompletableFuture<?> availabilityFuture) {
+            if (futuresToCombine[idx] == null || futuresToCombine[idx].isDone()) {
+                futuresToCombine[idx] = availabilityFuture;
+                assertNoException(availabilityFuture.thenRun(this::notifyCompletion));
             }
         }
     }

[flink] 01/02: [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 41dec908de4be749d4315069ef9a40a75e27f941
Author: wangpengcheng <wp...@gmail.com>
AuthorDate: Fri Jan 21 13:41:57 2022 +0800

    [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.
---
 .../runtime/io/StreamMultipleInputProcessor.java   | 83 +++++++++++++++++++---
 1 file changed, 75 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index f2ac737..db9b372 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
@@ -28,8 +29,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.runtime.concurrent.FutureUtils.assertNoException;
+import java.util.function.Consumer;
 
 /** Input processor for {@link MultipleInputStreamOperator}. */
 @Internal
@@ -38,6 +38,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
     private final MultipleInputSelectionHandler inputSelectionHandler;
 
     private final StreamOneInputProcessor<?>[] inputProcessors;
+
+    private final MultipleInputAvailabilityHelper availabilityHelper;
     /** Always try to read from the first input. */
     private int lastReadInputIndex = 1;
 
@@ -48,6 +50,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+    }
+
+    @Override
+    public boolean isAvailable() {
+        return inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished();
     }
 
     @Override
@@ -56,17 +66,19 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
+        /*
+         * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
+         * has some memory issue. This issue is fixed in jdk9.
+         * https://bugs.openjdk.java.net/browse/JDK-8160402
+         */
+        availabilityHelper.resetToUnAvailable();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
                     && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                availabilityHelper.anyOf(i, inputProcessors[i].getAvailableFuture());
             }
         }
-        return anyInputAvailable;
+        return availabilityHelper.getAvailableFuture();
     }
 
     @Override
@@ -159,4 +171,59 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private volatile CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+            availableFuture = new CompletableFuture<>();
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        /** Reset availableFuture to fresh unavailable. */
+        public void resetToUnAvailable() {
+            if (availableFuture.isDone()) {
+                availableFuture = new CompletableFuture<>();
+            }
+        }
+
+        private void notifyCompletion(int idx) {
+            availableFuture.complete(null);
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }
+
+        /**
+         * Implement `Or` logic.
+         *
+         * @param idx
+         * @param dep
+         */
+        public void anyOf(final int idx, CompletableFuture<?> dep) {
+            if (dep == AVAILABLE || dep.isDone()) {
+                cachedAvailableFutures[idx] = dep;
+                availableFuture.complete(null);
+            } else if (dep != cachedAvailableFutures[idx]) {
+                cachedAvailableFutures[idx] = dep;
+                dep.thenAccept(onCompletion[idx]);
+            }
+        }
+    }
 }