You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/02/04 11:42:09 UTC

[flink] 04/04: [FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9365510e8b0f5baaa25311bb44dd5c03d6773738
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jan 31 17:49:07 2022 +0100

    [FLINK-25827][task] Fix potential memory leak in SourceOperator when using CompletableFuture.anyOf
---
 .../streaming/api/operators/SourceOperator.java    | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index eb82768..0866a52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.io.DataInputStatus;
+import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
 import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -487,24 +488,26 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr
 
     private static class SourceOperatorAvailabilityHelper {
         private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture<>();
-        private CompletableFuture<Void> currentReaderFuture;
-        private CompletableFuture<?> currentCombinedFuture;
+        private final MultipleFuturesAvailabilityHelper availabilityHelper;
+
+        private SourceOperatorAvailabilityHelper() {
+            availabilityHelper = new MultipleFuturesAvailabilityHelper(2);
+            availabilityHelper.anyOf(0, forcedStopFuture);
+        }
 
         public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) {
-            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE) {
-                return sourceReaderFuture;
-            } else if (sourceReaderFuture == currentReaderFuture) {
-                return currentCombinedFuture;
-            } else {
-                currentReaderFuture = sourceReaderFuture;
-                currentCombinedFuture =
-                        CompletableFuture.anyOf(forcedStopFuture, sourceReaderFuture);
-                return currentCombinedFuture;
+            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE
+                    || sourceReaderFuture.isDone()) {
+                return AvailabilityProvider.AVAILABLE;
             }
+            availabilityHelper.resetToUnAvailable();
+            availabilityHelper.anyOf(0, forcedStopFuture);
+            availabilityHelper.anyOf(1, sourceReaderFuture);
+            return availabilityHelper.getAvailableFuture();
         }
 
         public void forceStop() {
-            this.forcedStopFuture.complete(null);
+            forcedStopFuture.complete(null);
         }
     }
 }