You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/31 16:28:58 UTC
[flink] 02/02: [FLINK-25728][task] Simplify MultipleInputAvailabilityHelper
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 761a4623ddac2b24a96d00fef33c32bcea29c92f
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jan 31 13:05:33 2022 +0100
[FLINK-25728][task] Simplify MultipleInputAvailabilityHelper
---
.../runtime/io/StreamMultipleInputProcessor.java | 91 ++++++++++------------
1 file changed, 40 insertions(+), 51 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index cb91d8e..ba7e46e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.api.operators.InputSelection;
@@ -28,7 +27,8 @@ import org.apache.flink.util.ExceptionUtils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+
+import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
/** Input processor for {@link MultipleInputStreamOperator}. */
@Internal
@@ -49,14 +49,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
StreamOneInputProcessor<?>[] inputProcessors) {
this.inputSelectionHandler = inputSelectionHandler;
this.inputProcessors = inputProcessors;
- this.availabilityHelper =
- MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
- }
-
- @Override
- public boolean isAvailable() {
- return inputSelectionHandler.isAnyInputAvailable()
- || inputSelectionHandler.areAllInputsFinished();
+ this.availabilityHelper = new MultipleInputAvailabilityHelper(inputProcessors.length);
}
@Override
@@ -65,11 +58,7 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
|| inputSelectionHandler.areAllInputsFinished()) {
return AVAILABLE;
}
- /*
- * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
- * has some memory issue. This issue is fixed in jdk9.
- * https://bugs.openjdk.java.net/browse/JDK-8160402
- */
+
availabilityHelper.resetToUnAvailable();
for (int i = 0; i < inputProcessors.length; i++) {
if (!inputSelectionHandler.isInputFinished(i)
@@ -170,57 +159,57 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
}
}
- /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
- @VisibleForTesting
- public static class MultipleInputAvailabilityHelper {
- private final CompletableFuture<?>[] cachedAvailableFutures;
- private final Consumer[] onCompletion;
- private volatile CompletableFuture<?> availableFuture;
-
+ /**
+ * This class is semi-thread safe. Only method {@link #notifyCompletion()} is allowed to be
+ * executed from an outside of the task thread.
+ *
+ * <p>It solves a problem of a potential memory leak as described in FLINK-25728. In short we
+ * have to ensure, that if there is one input (future) that rarely (or never) completes, that
+ * such future would not prevent previously returned combined futures (like {@link
+ * CompletableFuture#anyOf(CompletableFuture[])} from being garbage collected. Additionally, we
+ * don't want to accumulate more and more completion stages on such rarely completed future, so
+ * we are registering {@link CompletableFuture#thenRun(Runnable)} only if it has not already
+ * been done.
+ *
+ * <p>Note {@link #resetToUnAvailable()} doesn't de register previously registered futures. If
+ * future was registered in the past, but for whatever reason now it is not, such future can
+ * still complete the newly created future.
+ *
+ * <p>It might be no longer needed after upgrading to JDK9
+ * (https://bugs.openjdk.java.net/browse/JDK-8160402).
+ */
+ private static class MultipleInputAvailabilityHelper {
+ private final CompletableFuture<?>[] futuresToCombine;
+
+ private volatile CompletableFuture<?> availableFuture = new CompletableFuture<>();
+
+ public MultipleInputAvailabilityHelper(int inputSize) {
+ futuresToCombine = new CompletableFuture[inputSize];
+ }
+
+ /** @return combined future using anyOf logic */
public CompletableFuture<?> getAvailableFuture() {
return availableFuture;
}
- public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
- MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
- return obj;
- }
-
- private MultipleInputAvailabilityHelper(int inputSize) {
- this.cachedAvailableFutures = new CompletableFuture[inputSize];
- this.onCompletion = new Consumer[inputSize];
- availableFuture = new CompletableFuture<>();
- for (int i = 0; i < cachedAvailableFutures.length; i++) {
- final int inputIdx = i;
- onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
- }
- }
-
- /** Reset availableFuture to fresh unavailable. */
public void resetToUnAvailable() {
if (availableFuture.isDone()) {
availableFuture = new CompletableFuture<>();
}
}
- private void notifyCompletion(int idx) {
+ private void notifyCompletion() {
availableFuture.complete(null);
- cachedAvailableFutures[idx] = AVAILABLE;
}
/**
- * Implement `Or` logic.
- *
- * @param idx
- * @param dep
+ * Combine {@code availabilityFuture} using anyOf logic with other previously registered
+ * futures.
*/
- public void anyOf(final int idx, CompletableFuture<?> dep) {
- if (dep == AVAILABLE || dep.isDone()) {
- cachedAvailableFutures[idx] = dep;
- availableFuture.complete(null);
- } else if (dep != cachedAvailableFutures[idx]) {
- cachedAvailableFutures[idx] = dep;
- dep.thenAccept(onCompletion[idx]);
+ public void anyOf(final int idx, CompletableFuture<?> availabilityFuture) {
+ if (futuresToCombine[idx] == null || futuresToCombine[idx].isDone()) {
+ futuresToCombine[idx] = availabilityFuture;
+ assertNoException(availabilityFuture.thenRun(this::notifyCompletion));
}
}
}