You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 17:16:34 UTC

[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791934091



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       With out mockito it's not easy to test the internal input processors of the **StreamMultipleInputProcessor**. 
   I didn't came up with other solution to this.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 is try to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation.
   The `if` condition is identical with the original.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 tries to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation.
   The `if` condition is identical with the original.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org