You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/02/04 11:42:09 UTC
[flink] 04/04: [FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9365510e8b0f5baaa25311bb44dd5c03d6773738
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jan 31 17:49:07 2022 +0100
[FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf
---
.../streaming/api/operators/SourceOperator.java | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index eb82768..0866a52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -487,24 +488,26 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
private static class SourceOperatorAvailabilityHelper {
private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>();
- private CompletableFuture<Void> currentReaderFuture;
- private CompletableFuture<?> currentCombinedFuture;
+ private final MultipleFuturesAvailabilityHelper availabilityHelper;
+
+ private SourceOperatorAvailabilityHelper() {
+ availabilityHelper = new MultipleFuturesAvailabilityHelper(2);
+ availabilityHelper.anyOf(0, forcedStopFuture);
+ }
public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) {
- if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) {
- return sourceReaderFuture;
- } else if (sourceReaderFuture == currentReaderFuture) {
- return currentCombinedFuture;
- } else {
- currentReaderFuture = sourceReaderFuture;
- currentCombinedFuture =
- CompletableFuture.anyOf(forcedStopFuture, sourceReaderFuture);
- return currentCombinedFuture;
+ if (sourceReaderFuture == AvailabilityProvider.AVAILABLE
+ || sourceReaderFuture.isDone()) {
+ return AvailabilityProvider.AVAILABLE;
}
+ availabilityHelper.resetToUnAvailable();
+ availabilityHelper.anyOf(0, forcedStopFuture);
+ availabilityHelper.anyOf(1, sourceReaderFuture);
+ return availabilityHelper.getAvailableFuture();
}
public void forceStop() {
- this.forcedStopFuture.complete(null);
+ forcedStopFuture.complete(null);
}
}
}