You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 16:30:44 UTC

[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791869485



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       Isn't this a duplicate of `inputSelectionHandler.isAnyInputAvailable()` check from the `if` statement above (L54:56 in the original code)?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       My guess would be this code might be the reason behind the deadlocks. `notifyCompletion()` is executed from different threads (whoever is completing the `StreamOneInputProcessor` availability future) while it's accessing non thread safe fields here.
   
   I haven't yet thought how it should be fixed, but we should avoid adding extra synchronisation overhead in this code.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Could this test be rewritten to avoid using mockito?
   
   We are [generally speaking heavily discouraging mockito usage](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       The same question as below. Isn't this whole loop a duplicated check from the `inputSelectionHandler.isAnyInputAvailable()` condition? In other words, couldn't we just simply `return false` in this `else` branch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org