You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/01/31 16:28:57 UTC
[flink] 01/02: [FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f4de732da43d320167e2692d68bbb9692c62cdb
Author: wangpengcheng <wp...@gmail.com>
AuthorDate: Fri Jan 21 13:41:57 2022 +0800
[FLINK-25728][task] Avoid unnessesary CompletableFuture.thenRun calls on idle inputProcessor's avaiableFuture, preventing memory leaks.
---
.../runtime/io/StreamMultipleInputProcessor.java | 83 +++++++++++++++++++---
1 file changed, 75 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
index baacf26..cb91d8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.api.operators.InputSelection;
@@ -27,8 +28,7 @@ import org.apache.flink.util.ExceptionUtils;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.concurrent.FutureUtils.assertNoException;
+import java.util.function.Consumer;
/** Input processor for {@link MultipleInputStreamOperator}. */
@Internal
@@ -37,6 +37,8 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
private final MultipleInputSelectionHandler inputSelectionHandler;
private final StreamOneInputProcessor<?>[] inputProcessors;
+
+ private final MultipleInputAvailabilityHelper availabilityHelper;
/** Always try to read from the first input. */
private int lastReadInputIndex = 1;
@@ -47,6 +49,14 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
StreamOneInputProcessor<?>[] inputProcessors) {
this.inputSelectionHandler = inputSelectionHandler;
this.inputProcessors = inputProcessors;
+ this.availabilityHelper =
+ MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return inputSelectionHandler.isAnyInputAvailable()
+ || inputSelectionHandler.areAllInputsFinished();
}
@Override
@@ -55,17 +65,19 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
|| inputSelectionHandler.areAllInputsFinished()) {
return AVAILABLE;
}
- final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
+ /*
+ * According to the following issue. The implementation of `CompletableFuture.anyOf` in jdk8
+ * has some memory issue. This issue is fixed in jdk9.
+ * https://bugs.openjdk.java.net/browse/JDK-8160402
+ */
+ availabilityHelper.resetToUnAvailable();
for (int i = 0; i < inputProcessors.length; i++) {
if (!inputSelectionHandler.isInputFinished(i)
&& inputSelectionHandler.isInputSelected(i)) {
- assertNoException(
- inputProcessors[i]
- .getAvailableFuture()
- .thenRun(() -> anyInputAvailable.complete(null)));
+ availabilityHelper.anyOf(i, inputProcessors[i].getAvailableFuture());
}
}
- return anyInputAvailable;
+ return availabilityHelper.getAvailableFuture();
}
@Override
@@ -157,4 +169,59 @@ public final class StreamMultipleInputProcessor implements StreamInputProcessor
}
}
}
+
+ /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
+ @VisibleForTesting
+ public static class MultipleInputAvailabilityHelper {
+ private final CompletableFuture<?>[] cachedAvailableFutures;
+ private final Consumer[] onCompletion;
+ private volatile CompletableFuture<?> availableFuture;
+
+ public CompletableFuture<?> getAvailableFuture() {
+ return availableFuture;
+ }
+
+ public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+ MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+ return obj;
+ }
+
+ private MultipleInputAvailabilityHelper(int inputSize) {
+ this.cachedAvailableFutures = new CompletableFuture[inputSize];
+ this.onCompletion = new Consumer[inputSize];
+ availableFuture = new CompletableFuture<>();
+ for (int i = 0; i < cachedAvailableFutures.length; i++) {
+ final int inputIdx = i;
+ onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+ }
+ }
+
+ /** Reset availableFuture to fresh unavailable. */
+ public void resetToUnAvailable() {
+ if (availableFuture.isDone()) {
+ availableFuture = new CompletableFuture<>();
+ }
+ }
+
+ private void notifyCompletion(int idx) {
+ availableFuture.complete(null);
+ cachedAvailableFutures[idx] = AVAILABLE;
+ }
+
+ /**
+ * Implement `Or` logic.
+ *
+ * @param idx
+ * @param dep
+ */
+ public void anyOf(final int idx, CompletableFuture<?> dep) {
+ if (dep == AVAILABLE || dep.isDone()) {
+ cachedAvailableFutures[idx] = dep;
+ availableFuture.complete(null);
+ } else if (dep != cachedAvailableFutures[idx]) {
+ cachedAvailableFutures[idx] = dep;
+ dep.thenAccept(onCompletion[idx]);
+ }
+ }
+ }
}