You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 13:22:04 UTC

[GitHub] [flink] wpc009 opened a new pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

wpc009 opened a new pull request #18475:
URL: https://github.com/apache/flink/pull/18475


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Fixing a memory leak issue in *StreamMultipleInputProcessor*, see details in [FLINK-25728|https://issues.apache.org/jira/projects/FLINK/issues/FLINK-25728?filter=allopenissues]
   
   
   ## Brief change log
   
   - Adding an nested helper class *MultipleInputAvailabilityHelper* to  *StreamMultipleInputProcessor*.
   - Delegating *StreamMultipleInputProcessor*'s availability logic to *MultipleInputAvailabilityHelper* which break the CompletableFuture chain.
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   - Added JUnit  tests 'org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest.testCompletableFutureStackingIssue()'
   - Manually verified the change by running a mock stream locally.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes , adding one dependency in test scope.
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no) no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1025585234


   Your efforts didn't go for nothing. We deeply appreciate that you have reported and analysed this bug. A half year ago another user reported similar symptoms, but neither he nor we were able to track it down back then. Analysing it was definitely the most valuable and important part of this issue.
   
   Apart of the things that I've already commented, there are a couple of other issues. Also we will potentially need to deduplicate this code with a fix for FLINK-25827. To speed things up, I will take over your commit, drop the tests as (we will need to reimplement them in https://issues.apache.org/jira/browse/FLINK-25869 ), while merging most of your production code as it is. 
   
   >  I couldn't see how can the callback get the obsolete object. Dirty inconsistence between CPU cache cannot last that long. The callback will see the correct object.
   
   I'm afraid you still don't quite get it. If you are further curious about the subject, please search around for some other resources. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794476580



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > Even with volatile, the future completion can still happens inbetween the maybeReset() call (let's say, after the isDone() check, and before the set operation) and the completion callback will see the obsolete, old, already completed anyAvailable future, and try to complete it.
   > It's no difference. The AtomicReference is not preventing this. AtomicReference's set and get are plain method on plain object. 
   
   As I was trying to explain above in the point 2.
   
   > Take a closer look on the order of execution in my version methods maybeReset() and registerFuture()
   
   If input becomes available just after `anyAvailable.get().isDone()` check, but before `anyAvailable.set(new CompletableFuture<>())`, the combined/returned future will be available anyway. Such input would return `true` for  `needsToBeRegistered()` since its future would be completed, `registerFuture()` would be executed, which would immediately complete the combined future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024187218


   > I have add some e2e test for this. It will fail with an OOM error on the most recent 1.14.3 release.
   
   I was running your test for over 20 minutes without any problems locally.
   
   Can you post what JDK version you are using?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021816172


   Maybe I should push to `release-1.14` branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1023885086


   I have a fix.
   Tested on the `flink-benchmark`. I'm preparing some end-to-end test. I can push it at the end of today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain field reference.
   
   All atomic classes are wrappers around `volatile` field + indeed some extra atomic CAS operations and as such [they provide happens before relationship](https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024243886


   The case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around  `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Could check my other questions?
   > Can you post what JDK version you are using? How are you executing this test? Are you using some extra steps, maybe manually changing some configuration/environmental variables?
   >
   > Also shouldn't the test disable checkpointing? Otherwise I think every CheckpointBarrier would switch every input to available, releasing all of the accumulated CompletableFuture's.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870


   > ## CI report:
   > * [3ade971](https://github.com/apache/flink/commit/3ade9719b565beeacf7761ab71d5abb7ba62e62a) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045)
   > 
   > Bot commands
   > The @flinkbot bot supports the following commands:
   > * `@flinkbot run azure` re-run the last Azure build
   
   It seems that the **e2e_2_ci** test run out of time quota. And the ongoing test job got killed by the pipeline.
   I have run the tests on my local environments, and all tests were PASSED. 
   Do not know what's going wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021431119


   > I'm afraid that those "time outs" might be because of some bug in this code causing a deadlock. I've also tried [to run a benchmark request](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/601/) on this PR and the `MultipleInputBenchmark` benchmark run (from this repo https://github.com/apache/flink-benchmarks/) has also deadlocked.
   
   I start the tests on my local environment, which act normal.
   <img width="1436" alt="image" src="https://user-images.githubusercontent.com/2689362/151027213-1b898ff9-32a3-4834-90c5-ac247ca3973b.png">
   
   This is the failed pyflink test in Azure pipeline. Runs normally on my local environment.
   By the way, it seems that the HEAD of master branch can not pass all tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain field reference.
   
   I'm afraid you are wrong in this regard. All atomic classes are wrappers around `volatile` field + indeed some extra atomic CAS operations and such [they provide happens before relationship:](https://www.tutorialspoint.com/java_concurrency/concurrency_atomicreference.htm)
   
   > That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain field reference.
   
   All atomic classes are wrappers around `volatile` field + indeed some extra atomic CAS operations and such [they provide happens before relationship:](https://www.tutorialspoint.com/java_concurrency/concurrency_atomicreference.htm)
   
   > That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call (let's say, after the `isDone()` check, and before the `set` operation) and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object. Without the `CAS` operation, the only difference here is the `volatile` field.  (`volatile` enforces main memory (the RAM) write back, preventing inconsistent value of the same memory address between fast CPU caches.)
   But, since we can not preventing the race condition. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870


   > ## CI report:
   > * [3ade971](https://github.com/apache/flink/commit/3ade9719b565beeacf7761ab71d5abb7ba62e62a) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045)
   > 
   > Bot commands
   > The @flinkbot bot supports the following commands:
   > * `@flinkbot run azure` re-run the last Azure build
   
   It seems the e2e test run out of time quota. And the pipeline kill the unfinished ongoing test job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794368740



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > In your fix, the AtomicReference can not prevent that either.
   
   It doesn't need to. Take a closer look on the order of execution in my version methods `maybeReset()` and `registerFuture()` and think about what will happen if one of the input's availability future will complete concurrently. 
   1. If input becomes available before we call `maybeReset()` it's future will be hooked up to the new combined future.
   2. If input becomes available just after `anyAvailable.get().isDone()` check, but before `anyAvailable.set(new CompletableFuture<>())`, the combined/returned future will be available anyway. We will clean up everything in the next `getAvailableFuture()` call.
   3. If input becomes available after `maybeReset()`, it will just complete the combined/returned regardless if we register it or not.
   4. `registerFuture()` calls will take care of setting up all of the inputs' futures.
   
   So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of `AtomicReference` creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1022813985


   It seems like newer JDK release have already fixed the memory issue of `CompletableFuture.anyOf`. 
   Saddly, this issue is still exists in jdk8. 
   May be it's time to abandon this old truck.
   https://bugs.openjdk.java.net/browse/JDK-8160402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794490007



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       Maybe take a look on this. I draw a graph to illustrate how this fix is correct.
   > I made a graph to explain how this fix works. Even with the race condition between future's `notifyCompletion` and the `checkReusableAndReset`.
   > ![flink-issue-timeline drawio](https://user-images.githubusercontent.com/2689362/151551134-a30c100e-26a9-4061-9467-6c9b03657bcc.png)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28a8175852359690e62861bd0a4800318cca61dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28a8175852359690e62861bd0a4800318cca61dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 28a8175852359690e62861bd0a4800318cca61dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024243886


   As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around  `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Could check my other questions?
   > Can you post what JDK version you are using? How are you executing this test? Are you using some extra steps, maybe manually changing some configuration/environmental variables?
   >
   > Also shouldn't the test disable checkpointing? Otherwise I think every CheckpointBarrier would switch every input to available, releasing all of the accumulated CompletableFuture's.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28aefb8e368a9a89bb4d1967da27235cfe2ffcaa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384) 
   * db2210c8ec60f9a5608d3fdea53a753d623d896d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794346464



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       `notifyCompletion` may competing with `checkReusableAndReset`. We could not prevent that from happening. 
   But, this competing may be not harmful.
   In your fix, the `AtomicReference` can not prevent that neither.  The `isDone()` check is still seperate with the reset.
   Both ours fix is working. I have test them with 300 test runs. Did not see anykind of hang-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024054444


   > 
   
   
   
   > @wpc009 . Could you check my PR if it's solving the memory leak? My proposal is quite a bit simpler.
   > 
   > Regarding the tests, unfortunately I don't see a way for providing a fast and reliable unit test/itcase. The only idea I have is some longer running stress test, but...
   > 
   > Can you provide a steps to reproduce the problem and what JDK version are you using? I've tried running your tests, I have even tried to reproduce the problem on my own, and I could not observe this memory, even after 20 minutes of running a job that should have created a new leaked `CompletableFuture` every ~1ms. The number of `CompletableFuture` (and it's inner classes) remained more or less constant (oscillating a lot, but without a clear upward trend).
   
   You may not see a clear upward trend with large network buffer. Since we need the input switch between AVAILABLE and UNAVAILABLE quickly. You can try to reduce the network buffer size, and increase the high-througput input message rates.
   You will see lots of im-collectable `CompletableFuture` and `UniRun` instances.
   
   For a normal production environment, it may takes around 1 hour to ran out of memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794387909



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   The race condition exists in both of our fix. The `AtomicReference` without `CAS` operations is no difference with plain field reference. We both relies on redundant completion check to make sure that we are not missing any completion notification.
   
   I'm quite sure, replace the `AtomicReference` with plain field will make no difference.
   But this it not saying that your fix is not working. But only some opinion to the `AtomicReference` usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020101175


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 3ade9719b565beeacf7761ab71d5abb7ba62e62a (Mon Jan 24 13:27:49 UTC 2022)
   
   **Warnings:**
    * **1 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-25728).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object. Without the `CAS` operation, the only difference here is the `volatile` field. But, since we can not preventing the race condition. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024444960


   In your version, under the same circumtance, `anyAvailable.get()` may get the obsolete object either. `AtomicReference` can not guranteen the `happen-before` relationship you want. 
   In your version, situation is same. The original registered callback calls complete on an obsolete object. But, since the input's future is already complete. The following `registerFuture` will add an new `thenRun` to the input's future which will make the result future complete. There difference is I check the input future's state and manually complete the result future. Your version, the callback run two times, the first may miss because of the timming issue, the second will work correctly.
   A little larger memory footprint. Runnable object and future `UniRun` object. But the difference is minor. 
   The result future get completed correctly, without causing the stream hang-up.
   
   You keep saying my version will not working under such circumtances. But what I say is our solution is rafly the same. Both are working solution. The difference is minor, fewer object creation in my. Maybe fewer line of code in yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024448903


   I can make test for the scenario you described which my fix will fail. Just manually create a gap, resolve the reference first, then a long sleep, then try to complete. During the long sleep the other thread have the opptunity to finish the reset and register steps. 
   I'm quite sure it will work just fine. But, still I can make this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain field reference.
   
   All atomic classes are wrappers around `volatile` field + indeed some extra atomic CAS operations and such [they provide happens before relationship:](https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html)
   
   > That is, a set has a happens-before relationship with any subsequent get on the same variable. The atomic compareAndSet method also has these memory consistency features.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794485145



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I don't say your fix is not working. I just say it still works even without `AtomicReference`.
   >  Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3.
   Just to clarify that lacking of AtomicReference does not affect the result. It's still correct. Our fix, are both functioning well.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I don't say your fix is not working. I just say it still works even without `AtomicReference`.
   >  Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3.
   
   Just to clarify that lacking of AtomicReference does not affect the result. It's still correct. Our fix, are both functioning well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024817221


   I still believe this is a working fix. And no more complex than yours. They are on the same level.
   If the decision is made, then all I can say is that it takes a lot efforts on this issue, tracking down the code, anylasis memory heap, working out a solution, and also  testing on it. It's not pleasant to see those efforts went for nothing. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0939ca27ca9eeaa9302561097780f850fec65917",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0939ca27ca9eeaa9302561097780f850fec65917",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 0939ca27ca9eeaa9302561097780f850fec65917 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794332462



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I think this code has still a potential for a deadlock, as the thread calling this `notifyCompletion()` method can see an outdated value  of the `availableFuture` field.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024054444


   > 
   
   
   
   > @wpc009 . Could you check my PR if it's solving the memory leak? My proposal is quite a bit simpler.
   > 
   > Regarding the tests, unfortunately I don't see a way for providing a fast and reliable unit test/itcase. The only idea I have is some longer running stress test, but...
   > 
   > Can you provide a steps to reproduce the problem and what JDK version are you using? I've tried running your tests, I have even tried to reproduce the problem on my own, and I could not observe this memory, even after 20 minutes of running a job that should have created a new leaked `CompletableFuture` every ~1ms. The number of `CompletableFuture` (and it's inner classes) remained more or less constant (oscillating a lot, but without a clear upward trend).
   
   You may not see a clear upward trend with large network buffer. Since we need the input switch between AVAILABLE and UNAVAILABLE quickly. You can try to reduce the size of network buffer, and increase the message rates of the high-througput input.
   You will see lots of im-collectable `CompletableFuture` and `UniRun` instances.
   
   For a normal production environment, it may takes around 1 hour to ran out of memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28a8175852359690e62861bd0a4800318cca61dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28a8175852359690e62861bd0a4800318cca61dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 28a8175852359690e62861bd0a4800318cca61dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794332088



##########
File path: pom.xml
##########
@@ -224,6 +224,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>

Review comment:
       forget to remove this dependency from pom. Will remove it after the Azure pipeline finished.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794356621



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       | Benchmark |                                  Mode |  Cnt |      Score |     Error |   Units |
   | -------- | ------------| -------- | -------- | ------ | ------- |
   |MultipleInputBenchmark.multiInputMapSink |  thrpt |  300 |  16874.869 | ± 327.760 |  ops/ms |




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794493752



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       The only divergence here is the on the use of `AtomicReference`. 
   My option is that `AtomicReference` is not crucial in this scenario. Yours seems to be that the `AtomicReference` is the missing piece of my fix which prevents getting a correct result.
   I think both of our fix are functioning. And I will make some modification to the concern you have about the `volatile` feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870


   > ## CI report:
   > * [3ade971](https://github.com/apache/flink/commit/3ade9719b565beeacf7761ab71d5abb7ba62e62a) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045)
   > 
   > Bot commands
   > The @flinkbot bot supports the following commands:
   > * `@flinkbot run azure` re-run the last Azure build
   
   It seems the e2e test run out of time quota. And the pipeline kill the unfinished ongoing test job.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r792408057



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       That is part of the reason why we generally speaking do not accept new test code using mockito, as tests shouldn't relay on private implementation details of the production code.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       But
   1. in what scenario is this extra short cut doing something more compared to the old check?
   2. if there is such scenario, have you tested that it's worth the added complexity?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021298130


   I'm afraid that those "time outs" might be because of some bug in this code causing a deadlock. I've also tried [to run a benchmark request](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/601/) on this PR and the `MultipleInputBenchmark` benchmark run (from this repo https://github.com/apache/flink-benchmarks/) has also deadlocked.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1022813985


   It seems like newer JDK release have already fixed the memory issue of `CompletableFuture.anyOf`. 
   Saddly, this issue is still exists in jdk8. It's time to abandon this old truck.
   https://bugs.openjdk.java.net/browse/JDK-8160402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870


   > ## CI report:
   > * [3ade971](https://github.com/apache/flink/commit/3ade9719b565beeacf7761ab71d5abb7ba62e62a) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045)
   > 
   > Bot commands
   > The @flinkbot bot supports the following commands:
   > * `@flinkbot run azure` re-run the last Azure build
   
   It seems that the **e2e_2_ci** test run out of time quota. And the pipeline kill the unfinished ongoing test job.
   I have run the tests on my local environments, and all tests were PASSED. 
   Do not know what's going wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024372868


   > As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Can I understand this way? What you are saying is If `Complete` happens at the very end. And, not using `volatile` causing the thread that running the `notifyCompletion` could not see the new `availableFuture` and it tries to complete the obsolete future leaving the newly created future untouched. That causing the hang-up. 
   
   I would say without `volatile` that could happen. But, before entry `notifyCompletion`, the input's future is already completed.  At the `anyOf` stage, it will check the future of the input where is correctly completed. And thus triggered the following `availableFuture.complete()`. It will not halt. 
   
   I can make the availableFuture `volatile`. In that way, the `Complete` will not able to see the obsolete future.
   
   I'm running the `MultipleInputBenchmark` test for hours. No stream hang-up is show up.
   <img width="297" alt="image" src="https://user-images.githubusercontent.com/2689362/151582292-01aacf62-2bff-4fb7-a160-ec40552e69cd.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1025964209


   Merged to master as cb8c6567d205e21d844679220d727379409c6b24


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1023885086


   > Good find @wpc009, but unfortunately we have to support JDK8 for at least a couple of more releases.
   > 
   > I'm testing another approach to solve this problem here: #18538 hopefully without deadlocks.
   > 
   > Please @wpc009 let me know what you think about it.
   
   Sure thing.
   By the way, I came up with a solution to the deadlocks. It's tested on the `flink-benchmark` with hundreds of times. 
   I'm preparing some end-to-end tests. Will push it at the end of today.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024054444


   > 
   
   
   
   > @wpc009 . Could you check my PR if it's solving the memory leak? My proposal is quite a bit simpler.
   > 
   > Regarding the tests, unfortunately I don't see a way for providing a fast and reliable unit test/itcase. The only idea I have is some longer running stress test, but...
   > 
   > Can you provide a steps to reproduce the problem and what JDK version are you using? I've tried running your tests, I have even tried to reproduce the problem on my own, and I could not observe this memory, even after 20 minutes of running a job that should have created a new leaked `CompletableFuture` every ~1ms. The number of `CompletableFuture` (and it's inner classes) remained more or less constant (oscillating a lot, but without a clear upward trend).
   
   You may not see a clear upward trend with large network buffer. Since we need the input switch between AVAILABLE and UNAVAILABLE quickly. You can try to reduce the size of network buffer, and increase the message rates of the high-througput input. Then you will see lots of im-collectable `CompletableFuture` and `UniRun` instances, and it's increasing.
   Eventually there will be OOM.
   
   For a normal production environment, it may takes around 1 hour to ran out of memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call (let's see, after the `isDone()` check, and before the `set` operation) and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object. Without the `CAS` operation, the only difference here is the `volatile` field. But, since we can not preventing the race condition. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not prevent this. AtomicReference's `set` and `get` are plain method on plain object, without the `CAS` operation. The only thing difference here is the `volatile` field. But, since we can not preventing the race condition here. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794493752



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       The only divergence here is the on the use of `AtomicReference`. 
   My option is that `AtomicReference` is not crucial in this scenario. Yours seems to be that the `AtomicReference` is the missing piece of my fix which prevents getting a correct result.
   I think both of our fix are functioning. I will make some last modification to the concern you have about the `volatile` feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024349502


   BellSoft Liberica jdk8u292.
   <img width="723" alt="image" src="https://user-images.githubusercontent.com/2689362/151578190-851c24c5-f7ba-4f4d-b1b7-03ea96e45fd6.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1025585234


   Your efforts didn't go for nothing. We deeply appreciate that you have reported and analysed this bug. A half year ago another user reported similar symptoms, but neither he nor we were able to track it down back then. Analysing it was definitely the most valuable and important part of this issue.
   
   Apart of the things that I've already commented, there are a couple of other smaller (stylistic) issues. Also we will potentially need to deduplicate this code with a fix for FLINK-25827. To speed things up, I will take over your commit, drop the tests as (we will need to reimplement them in https://issues.apache.org/jira/browse/FLINK-25869 ), while merging most of your production code as it is. 
   
   >  I couldn't see how can the callback get the obsolete object. Dirty inconsistence between CPU cache cannot last that long. The callback will see the correct object.
   
   I'm afraid you still don't quite get it. If you are further curious about the subject, please search around for some other resources. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870


   > ## CI report:
   > * [3ade971](https://github.com/apache/flink/commit/3ade9719b565beeacf7761ab71d5abb7ba62e62a) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045)
   > 
   > Bot commands
   > The @flinkbot bot supports the following commands:
   > * `@flinkbot run azure` re-run the last Azure build
   
   It seems that the **e2e_2_ci** test run out of time quota. And the pipeline kill the unfinished ongoing test job. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021815826


   The `release-1.14` branch can not pass the CI pipeline neither.
   Check this out.
   https://dev.azure.com/wpc009/Flink/_build/results?buildId=5&view=results
   
   What can I do to pass these tests?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791869485



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       Isn't this a duplicate of `inputSelectionHandler.isAnyInputAvailable()` check from the `if` statement above (L54:56 in the original code)?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       My guess would be this code might be the reason behind the deadlocks. `notifyCompletion()` is executed from different threads (whoever is completing the `StreamOneInputProcessor` availability future) while it's accessing non thread safe fields here.
   
   I haven't yet thought how it should be fixed, but we should avoid adding extra synchronisation overhead in this code.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Could this test be rewritten to avoid using mockito?
   
   We are [generally speaking heavily discouraging mockito usage](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       The same question as below. Isn't this whole loop a duplicated check from the `inputSelectionHandler.isAnyInputAvailable()` condition? In other words, couldn't we just simply `return false` in this `else` branch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski closed pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski closed pull request #18475:
URL: https://github.com/apache/flink/pull/18475


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024815295


   If the `Complete` indicates the input future completion. (Thats a little different in my graph. In the graph it refers to the line the calls the `complete` on the result future).
   when anyOf is being executed, and the input future is not ready at that time. It complete some time after that. I couldn't see how can the callback get the obsolete object. Dirty inconsistence between CPU cache cannot last that long. The callback will see the correct object. Anyway, I already change the `availableFuture` to `volatile`. Which make this GONE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024187218


   > I have add some e2e test for this. It will fail with an OOM error on the most recent 1.14.3 release.
   
   I was running your test for over 20 minutes without any problems locally.
   
   Can you post what JDK version you are using? How are you executing this test? Are you using some extra steps, maybe manually changing some configuration/environmental variables?
   
   Also shouldn't the test disable checkpointing? Otherwise I think every `CheckpointBarrier` would switch every input to available, releasing all of the accumulated `CompletableFuture`'s.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call (let's say, after the `isDone()` check, and before the `set` operation) and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object. Without the `CAS` operation, the only difference here is the `volatile` field. But, since we can not preventing the race condition. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024372868


   > As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Can I understand this way? What you are saying is If `Complete` happens at the very end. And, not using `volatile` causing the thread that running the `notifyCompletion` could not see the new `availableFuture`. The `notifyCompletion` tries to complete the obsolete future. That causing the halt. 
   
   I would say without `volatile` that could happen. But, before entry `notifyCompletion`, the input's future is already completed.  At the `anyOf` stage, it will check the future of the input where is correctly completed. And thus triggered the following `availableFuture.complete()`. It will not halt. 
   
   I can make the availableFuture `volatile`. In that way, the `Complete` will not able to see the obsolete future.
   
   I'm running the `MultipleInputBenchmark` test for hours. No stream hang-up is show up.
   <img width="297" alt="image" src="https://user-images.githubusercontent.com/2689362/151582292-01aacf62-2bff-4fb7-a160-ec40552e69cd.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794451818



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       In this scenario, we may not benefits a lot from the `volatile` field. Since, it only preventing dirty memory cache between CPU cores. 
   
   > So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of AtomicReference creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   Even with `volatile`, the future completion can still happens inbetween the `maybeReset()` call and the completion callback will see the obsolete, old, already completed `anyAvailable` future, and try to complete it.
   It's no difference. The AtomicReference is not preventing this. AtomicReference's `set` and `get` are plain method on plain object, without the `CAS` operation. The only thing difference here is the `volatile` field. But, since we can not preventing the race condition here. Future completion still has the chance to see old future object. So, `volatile` is the least thing to concern here.
   But, anyway, there is no harm to make the `availableFuture` `volatile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024349502


   JDK is `BellSoft Liberica jdk8u292`.
   No other environment or configuration. Just setting the JAVA_HOME and `distDir`.
   Check out this test `MultipleInputStreamMemoryIssueTest`. 
   `checkpoint` is disabled by default.
   <img width="723" alt="image" src="https://user-images.githubusercontent.com/2689362/151578190-851c24c5-f7ba-4f4d-b1b7-03ea96e45fd6.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794663542



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +170,68 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        /**
+         * Check the finished state of availableFuture. Reuse if possible. Renew {availableFuture}
+         * if previous availableFuture is already completed.
+         *
+         * @return true if availableFuture is renewed. false, reuse previous availableFuture.
+         */
+        public boolean checkReusableAndReset() {

Review comment:
       This reusable check maybe useless. When the inputProcessor's `getAvailableFuture` is called, the `availableFuture` is definitely completed. I will remove the reusable check, and revert it to previous version. only reset the `availableFuture` to freash new.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794416248



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > The AtomicReference without CAS operations is no difference with plain field reference.
   
   All atomic classes are wrappers around `volatile` field + indeed some extra atomic CAS operations and such [they provide happens before relationship](https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794368740



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > In your fix, the AtomicReference can not prevent that either.
   
   It doesn't need to. Take a closer look on the order of execution in my version methods `maybeReset()` and `registerFuture()` and think about what will happen if one of the input's availability future will complete concurrently. 
   1. If input becomes available before we call `maybeReset()` it's future will be hooked up to the new combined future.
   2. If input becomes available just after `anyAvailable.get().isDone()` check, but before `anyAvailable.set(new CompletableFuture<>())`, the combined/returned future will be made available anyway via register call. We will clean up everything in the next `getAvailableFuture()` call.
   3. If input becomes available after `maybeReset()`, it will just complete the combined/returned regardless if we register it or not.
   4. `registerFuture()` calls will take care of setting up all of the inputs' futures.
   
   So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of `AtomicReference` creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   >  The isDone() check is still seperate with the reset.
   
   This `isDone()` check is irrelevant, as it can be executed on the wrong/old/obsolete instance of the `CompletableFuture` in your case.
   
   > Both ours fix is working.
   
   Maybe the deadlock is very rare, maybe you are not testing the right scenario (you would need to have a scenario where at first both inputs are flickering available, and then at the same time, while 2nd input is unavailable, 1st input becomes permanently unavailable, while second input becomes available (and 2nd input's availability is swallowed).
   
   Further more, it might be working perfectly fine now, but if JDK  optimises the code a bit differently, or JDK is upgraded it will stop working.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28aefb8e368a9a89bb4d1967da27235cfe2ffcaa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0939ca27ca9eeaa9302561097780f850fec65917",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0939ca27ca9eeaa9302561097780f850fec65917",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 0939ca27ca9eeaa9302561097780f850fec65917 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28a8175852359690e62861bd0a4800318cca61dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28a8175852359690e62861bd0a4800318cca61dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 28a8175852359690e62861bd0a4800318cca61dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r792294286



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       Maybe this shortcut is useless. I will remove it.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Since this issue is related to overwhelming objects creation and holding a reference to completed future objects.
   So, testing this behavior is hard to achieve without mockito.
   Any good suggestion for this? I'm ok to remove mockito if we can verify no extra future objects created on idle input.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I confirmed that the benchmark of MultipleInputStream got blocked during test.
   This may be related to the competing between `notifyCompletion` calls and `resetToUnavailable` calls.
   I'm working on a solution.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       I see you point. The `availableInputsMask` inside `inputSelectionHandler` and the `future` both indicate  that wheher the corresponding input is available or not. I will make some modification.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       I see.
   You mean just remove this unit test? Since, the internal private implementation details of `StreamMultipleInputProcessor` can not be tested.
   Can I understand it that way? 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       Yeah, you were right.
   The shortcut for-loop is redundant with `inputSelectionHandler.isAnyInputAvailable()`. 
   I will make a change.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       The original code does not override the `isAvailable` method from `AvailabilityProvider` which will call the `getAvailableFuture` first and determing the availability status according to the status of future. the `getAvailableFuture` creates a new instance each time some one calls it. So, duplicate code to avoid calling `getAvailableFuture` inorder to reducing memory footprints.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I think the competing will cause duplicate future complete calls. Do not know how it will get stucked. 
   I will check these benchmarks, and try to find what cause these deadlocks.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       This implementation may not be optimal. I may not fully understand the difference between the `inputSelectionHandler.isAnyInputAvailable` and the `for` loop bellow. 
   This paragraph is borrowed from the original code, replacing the `thenRun` part, with aggregating the result of `inputProcessors[i].isAvailable()`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024372868


   > As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Can I understand this way? What you are saying is If `Complete` happens at the very end. And, not using `volatile` causing the thread that running the `notifyCompletion` could not see the new `availableFuture` and it tries to complete the obsolete future leaving the newly created future untouched. That cause the stream hang-up. 
   
   I would say without `volatile` that could happen. But, before entry `notifyCompletion`, the input's future is already completed.  At the `anyOf` stage, it will check the future of the input where is correctly completed. And thus triggered the following `availableFuture.complete()`. It will not halt. 
   
   I can make the availableFuture `volatile`. In that way, the `Complete` will not able to see the obsolete future.
   
   I'm running the `MultipleInputBenchmark` test for hours. No stream hang-up is show up.
   <img width="297" alt="image" src="https://user-images.githubusercontent.com/2689362/151582292-01aacf62-2bff-4fb7-a160-ec40552e69cd.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30429",
       "triggerID" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 28aefb8e368a9a89bb4d1967da27235cfe2ffcaa Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384) 
   * db2210c8ec60f9a5608d3fdea53a753d623d896d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30429) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794356621



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       300 `MultipeInputBenchmark` test runs. without hang-up.
   
   | Benchmark |                                  Mode |  Cnt |      Score |     Error |   Units |
   | -------- | ------------| -------- | -------- | ------ | ------- |
   |MultipleInputBenchmark.multiInputMapSink |  thrpt |  300 |  16874.869 | ± 327.760 |  ops/ms |




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357) 
   * 28aefb8e368a9a89bb4d1967da27235cfe2ffcaa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024815295


   If the `Complete` indicates the input future completion. (Thats a little different in my graph. In the graph it refers to the line the calls the `complete` on the result future).
   when anyOf is being executed, and the input future is not ready at that time. It complete some time after that. I couldn't see how can the callback get the obsolete object. Dirty inconsistence between CPU cache cannot last that long. The callback will see the correct object. Any way, I already change the `availableFuture` to `volatile`. Which make this GONE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021815826


   The master branch can not pass the CI pipeline either.
   Check this out.
   https://dev.azure.com/wpc009/Flink/_build/results?buildId=5&view=results
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020772870






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021815826


   The `release-1.14` branch can not pass the CI pipeline neither.
   Check this out.
   https://dev.azure.com/wpc009/Flink/_build/results?buildId=5&view=results
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 removed a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 removed a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021816172


   Maybe I should push to `release-1.14` branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30384",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30429",
       "triggerID" : "db2210c8ec60f9a5608d3fdea53a753d623d896d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db2210c8ec60f9a5608d3fdea53a753d623d896d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30429) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794346464



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       `notifyCompletion` may competing with `checkReusableAndReset`. We could not prevent that without using synchronisim locks. 
   But, this competing may be not harmful.
   In your fix, the `AtomicReference` can not prevent that neither.  The `isDone()` check is still seperate with the reset.
   Both ours fix is working. I have test them with 300 test runs. Did not see anykind of hang-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794346464



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       `notifyCompletion` may competing with `checkReusableAndReset`. We could not prevent that without using synchronisim locks. 
   But, this competing may be not harmful.
   In your fix, the `AtomicReference` can not prevent that either.  The `isDone()` check is still seperate with the reset.
   Both ours fix is working. I have test them with 300 test runs. Did not see anykind of hang-up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794493752



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       The only divergence here is the on the use of `AtomicReference`. 
   My option is that `AtomicReference` is not crucial in this scenario. Yours seems to be that the `AtomicReference` is the missing piece of my fix which prevents getting a correct result.
   I think both of our fix are functioning. And I will make some last modification to the concern you have about the `volatile` feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794493752



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       The only divergence here is the on the use of `AtomicReference`. 
   My option is that `AtomicReference` is not crucial in this scenario. Yours seems to be that the `AtomicReference` is the missing piece of my fix which preventing getting a correct results.
   I think both of our fix are functioning. I will make some last modification to the concern you have about the `volatile` feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794490007



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       Maybee check on this. I draw a graph to illustrate how this fix is correct.
   > I made a graph to explain how this fix works. Even with the race condition between future's `notifyCompletion` and the `checkReusableAndReset`.
   > ![flink-issue-timeline drawio](https://user-images.githubusercontent.com/2689362/151551134-a30c100e-26a9-4061-9467-6c9b03657bcc.png)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1025585234


   Your efforts didn't go for nothing. We deeply appreciate that you have reported and analysed this bug. A half year ago another user reported similar symptoms, but neither he nor we were able to track it down back then. Analysing it was definitely the most valuable and important part of this issue.
   
   Apart of the things that I've already commented, there are a couple of other issues in your version. Also we will potentially need to deduplicate this code with a fix for FLINK-25827. To speed things up, I will take over your commit, drop the tests as (we will need to reimplement them in https://issues.apache.org/jira/browse/FLINK-25869 ), while merging most of your production code as it is fixing the remaining issues. 
   
   >  I couldn't see how can the callback get the obsolete object. Dirty inconsistence between CPU cache cannot last that long. The callback will see the correct object.
   
   I'm afraid you still don't quite get it. If you are further curious about the subject, please search around for some other resources. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794485145



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       I don't say your fix is not working. I just say it still works even without `AtomicReference`. So, did my fix here.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024191941


   I made a graph to explain how this fix works. Even with the race condition between future's `notifyCompletion` and the `checkReusableAndReset`.
   ![flink-issue-timeline drawio](https://user-images.githubusercontent.com/2689362/151551134-a30c100e-26a9-4061-9467-6c9b03657bcc.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021815826


   The master branch can not pass the CI pipeline neither.
   Check this out.
   https://dev.azure.com/wpc009/Flink/_build/results?buildId=5&view=results
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 removed a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 removed a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021815826


   The `release-1.14` branch can not pass the CI pipeline neither.
   Check this out.
   https://dev.azure.com/wpc009/Flink/_build/results?buildId=5&view=results
   
   What can I do to pass these tests?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791934091



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Without mockito, it's not easy to test the internal  state of **StreamMultipleInputProcessor**. 
   I didn't came up with other solution for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1021298130


   I'm afraid that those "time outs" might be because of some bug in this code causing a deadlock. I've also tried [to run a benchmark request](http://codespeed.dak8s.net:8080/job/flink-benchmark-request/601/) on this PR and the `MultipleInputBenchmark` benchmark run (from this repo https://github.com/apache/flink-benchmarks/) has also deadlocked.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791869485



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       Isn't this a duplicate of `inputSelectionHandler.isAnyInputAvailable()` check from the `if` statement above (L54:56 in the original code)?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       My guess would be this code might be the reason behind the deadlocks. `notifyCompletion()` is executed from different threads (whoever is completing the `StreamOneInputProcessor` availability future) while it's accessing non thread safe fields here.
   
   I haven't yet thought how it should be fixed, but we should avoid adding extra synchronisation overhead in this code.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Could this test be rewritten to avoid using mockito?
   
   We are [generally speaking heavily discouraging mockito usage](https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-mockito---use-reusable-test-implementations)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -47,6 +49,26 @@ public StreamMultipleInputProcessor(
             StreamOneInputProcessor<?>[] inputProcessors) {
         this.inputSelectionHandler = inputSelectionHandler;
         this.inputProcessors = inputProcessors;
+        this.availabilityHelper =
+                MultipleInputAvailabilityHelper.newInstance(inputProcessors.length);
+        this.availabilityHelper.init();
+    }
+
+    @Override
+    public boolean isAvailable() {
+        if (inputSelectionHandler.isAnyInputAvailable()
+                || inputSelectionHandler.areAllInputsFinished()) {
+            return true;
+        } else {
+            boolean isAvailable = false;
+            for (int i = 0; i < inputProcessors.length; i++) {
+                isAvailable =
+                        !inputSelectionHandler.isInputFinished(i)
+                                && inputSelectionHandler.isInputSelected(i)
+                                && inputProcessors[i].isAvailable();
+            }
+            return isAvailable;

Review comment:
       The same question as below. Isn't this whole loop a duplicated check from the `inputSelectionHandler.isAnyInputAvailable()` condition? In other words, couldn't we just simply `return false` in this `else` branch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791934091



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       Without mockito, it's not easy to test the internal input processors of **StreamMultipleInputProcessor**. 
   I didn't came up with other solution for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r791934091



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
##########
@@ -101,10 +101,15 @@
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;

Review comment:
       With out mockito it's not easy to test the internal input processors of the **StreamMultipleInputProcessor**. 
   I didn't came up with other solution to this.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 is try to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation.
   The `if` condition is identical with the original.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -55,17 +77,24 @@ public StreamMultipleInputProcessor(
                 || inputSelectionHandler.areAllInputsFinished()) {
             return AVAILABLE;
         }
-        final CompletableFuture<?> anyInputAvailable = new CompletableFuture<>();
         for (int i = 0; i < inputProcessors.length; i++) {
             if (!inputSelectionHandler.isInputFinished(i)
-                    && inputSelectionHandler.isInputSelected(i)) {
-                assertNoException(
-                        inputProcessors[i]
-                                .getAvailableFuture()
-                                .thenRun(() -> anyInputAvailable.complete(null)));
+                    && inputSelectionHandler.isInputSelected(i)
+                    && inputProcessors[i].getAvailableFuture() == AVAILABLE) {
+                return AVAILABLE;

Review comment:
       the for loop through L80 to L86 tries to do some short cut if any of the inputProcessor's avaiableFuture is **AVAILABLE**, avoid the following new instance creation.
   The `if` condition is identical with the original.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1023885086


   > Good find @wpc009, but unfortunately we have to support JDK8 for at least a couple of more releases.
   > 
   > I'm testing another approach to solve this problem here: #18538 hopefully without deadlocks.
   > 
   > Please @wpc009 let me know what you think about it.
   
   Sure thing.
   By the way, I came up with a solution which do not have deadlocks. It's tested on the `flink-benchmark` hundreds of times. 
   I'm preparing some end-to-end tests. Will push it by the end of the day.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024041597


   > @wpc009 . Could you check my PR if it's solving the memory leak? My proposal is quite a bit simpler.
   > 
   > Regarding the tests, unfortunately I don't see a way for providing a fast and reliable unit test/itcase. The only idea I have is some longer running stress test, but...
   > 
   > Can you provide a steps to reproduce the problem and what JDK version are you using? I've tried running your tests, I have even tried to reproduce the problem on my own, and I could not observe this memory, even after 20 minutes of running a job that should have created a new leaked `CompletableFuture` every ~1ms. The number of `CompletableFuture` (and it's inner classes) remained more or less constant (oscillating a lot, but without a clear upward trend).
   
   I have add some e2e test for this. It will fail with an `OOM` error on the most recent `1.14.3` release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1023958486


   Here is some benchmark results (From my laptop)
   | Benchmark                                                                    | Mode  | Cnt | Score        | Error   | Units       |
   |----------------------------------------------------|-------|-----|-----------|----------|--------|
   | MultipleInputBenchmark.multiInputChainedIdleSource | thrpt | 30  | 27196.170 | ± 244.064  | ops/ms |
   | MultipleInputBenchmark.multiInputMapSink           | thrpt | 30  | 16289.002 | ± 1666.231 | ops/ms |
   | MultipleInputBenchmark.multiInputOneIdleMapSink    | thrpt | 30  | 9257.355  | ± 585.416  | ops/ms |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357",
       "triggerID" : "4c368879072e4ae60cac37aaa1b7a49454a95fda",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28aefb8e368a9a89bb4d1967da27235cfe2ffcaa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c368879072e4ae60cac37aaa1b7a49454a95fda Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30357) 
   * 28aefb8e368a9a89bb4d1967da27235cfe2ffcaa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794663542



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +170,68 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side of StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        /**
+         * Check the finished state of availableFuture. Reuse if possible. Renew {availableFuture}
+         * if previous availableFuture is already completed.
+         *
+         * @return true if availableFuture is renewed. false, reuse previous availableFuture.
+         */
+        public boolean checkReusableAndReset() {

Review comment:
       This reusable check maybe useless. When the inputProcessor's `getAvailableFuture` is called, the `availableFuture` is definitely completed. This reusable check shall be removed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024372868


   > As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Can I understand this way? 
   Let's say there is thread1 and thread2. thread 1 running the `getAvailableFuture` function, the thread2 handling the future's completion callback. Consider the following scenario.
   1. just before thread2 reach the  `availableFuture.complete(null)` line, thread1 finish reseting the `avaiableFuture` and calls `anyOf` on each input's future. 
   2.thread2 reach the `availableFuture.complete(null)` line, after resolve the `availableFuture` object, but before it calls the complete method. During that time, thread1 finish reseting and `anyOf` calls. 
   
   In 1, thread1 reseting the `availableFuture` happens before. thread2 resolve varialbe and call the `complete` happens after.
   So, without the dirty cache inference ( can be avoided by making the `availableFuture` volatile), thread2 will get the fresh new future and complete it. Event if thread2 get the obsolete instance, falls to complete the new future. the thread1 can still make the state right in `anyOf` calls. Since, at the every begining, the input's future is already completed.
   In 2, thread2 holds an obsolete instance and try to complete it. Still, thread1 cat make the state right in `anyOf`. This is same with the dirty cache scenario.
   I would say without `volatile` `notifyCompletion` see a obsolete future could happen. But, before entry the `notifyCompletion`, the input's future is already completed.  In the `anyOf` function, it will recheck whether the future of the input is completed, and thus triggered the following `availableFuture.complete()`. Stream will not halt. 
   
   Anyway it's cheap to make the availableFuture `volatile`. In that way, the `Complete` will not able to see the obsolete future even in this scenario.
   
   I couldn't see there exists some point that this version will fail, the tests prove that
   I'm running the `MultipleInputBenchmark` test for hours. No stream hang-up is show up.
   <img width="297" alt="image" src="https://user-images.githubusercontent.com/2689362/151582292-01aacf62-2bff-4fb7-a160-ec40552e69cd.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1020104543


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3ade9719b565beeacf7761ab71d5abb7ba62e62a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045",
       "triggerID" : "1020772870",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351",
       "triggerID" : "1ede9e74ea4b64e9227ea3230d4adc189356c39c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "28a8175852359690e62861bd0a4800318cca61dc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "28a8175852359690e62861bd0a4800318cca61dc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3ade9719b565beeacf7761ab71d5abb7ba62e62a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30045) 
   * 1ede9e74ea4b64e9227ea3230d4adc189356c39c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30351) 
   * 28a8175852359690e62861bd0a4800318cca61dc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #18475:
URL: https://github.com/apache/flink/pull/18475#discussion_r794368740



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -157,4 +186,66 @@ private void fullCheckAndSetAvailable() {
             }
         }
     }
+
+    /** Visible for testing only. Do not use out side StreamMultipleInputProcessor. */
+    @VisibleForTesting
+    public static class MultipleInputAvailabilityHelper {
+        private final CompletableFuture<?>[] cachedAvailableFutures;
+        private final Consumer[] onCompletion;
+        private CompletableFuture<?> availableFuture;
+
+        public CompletableFuture<?> getAvailableFuture() {
+            return availableFuture;
+        }
+
+        public static MultipleInputAvailabilityHelper newInstance(int inputSize) {
+            MultipleInputAvailabilityHelper obj = new MultipleInputAvailabilityHelper(inputSize);
+            return obj;
+        }
+
+        private MultipleInputAvailabilityHelper(int inputSize) {
+            this.cachedAvailableFutures = new CompletableFuture[inputSize];
+            this.onCompletion = new Consumer[inputSize];
+        }
+
+        @VisibleForTesting
+        public void init() {
+            for (int i = 0; i < cachedAvailableFutures.length; i++) {
+                final int inputIdx = i;
+                onCompletion[i] = (Void) -> notifyCompletion(inputIdx);
+            }
+        }
+
+        public boolean isInvalid() {
+            return availableFuture == null || availableFuture.isDone();
+        }
+
+        public void resetToUnavailable() {
+            availableFuture = new CompletableFuture<>();
+        }
+
+        private void notifyCompletion(int idx) {
+            if (availableFuture != null && !availableFuture.isDone()) {
+                availableFuture.complete(null);
+            }
+            cachedAvailableFutures[idx] = AVAILABLE;
+        }

Review comment:
       > In your fix, the AtomicReference can not prevent that either.
   
   It doesn't need to. Take a closer look on the order of execution in my version methods `maybeReset()` and `registerFuture()` and think about what will happen if one of the input's availability future will complete concurrently. 
   1. If input becomes available before we call `maybeReset()` it's future will be hooked up to the new combined future.
   2. If input becomes available just after `anyAvailable.get().isDone()` check, but before `anyAvailable.set(new CompletableFuture<>())`, the combined/returned future will be available anyway. We will clean up everything in the next `getAvailableFuture()` call.
   3. If input becomes available after `maybeReset()`, it will just complete the combined/returned regardless if we register it or not.
   4. `registerFuture()` calls will take care of setting up all of the inputs' futures.
   
   So I don't see any race condition in my version. Yours works quite similar after all, but your version's lack of `AtomicReference` creates an opportunity, for example in point 3., that if input becomes available, it will attempt to complete wrong, obsolete, old, already completed future.
   
   >  The isDone() check is still seperate with the reset.
   
   This `isDone()` check is irrelevant, as it can be executed on the wrong/old/obsolete instance of the `CompletableFuture` in your case.
   
   > Both ours fix is working.
   
   Maybe the deadlock is very rare, maybe you are not testing the right scenario (you would need to have a scenario where at first both inputs are flickering available, and then at the same time, while 2nd input is unavailable, 1st input becomes permanently unavailable, while second input becomes available (and 2nd input's availability is swallowed).
   
   Further more, it might be working perfectly fine now, but if JDK  optimises the code a bit differently, or JDK is upgraded it will stop working.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wpc009 edited a comment on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
wpc009 edited a comment on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024372868


   > As far as I can tell, the case in the 3rd chart is not working in your version. If `Complete` action happens at the very very end (around `to here` arrow), there is no guarantee that it will "complete the newly created future". As you haven't established happens-before relationship between `Reset` and `Complete` actions, `Complete` might see an outdated, already completed, version of the `availableFuture`, failing to wake up the task thread.
   
   Can I understand this way? What you are saying is If `Complete` happens at the very end. And, not using `volatile` causing the thread that running the `notifyCompletion` could not see the new `availableFuture` and it tries to complete the obsolete future leaving the newly created future untouched. That cause the stream hang-up. 
   
   I would say without `volatile` `notifyCompletion` see a obsolete future could happen. But, before entry the `notifyCompletion`, the input's future is already completed.  In the `anyOf` function, it will recheck whether the future of the input is completed, and thus triggered the following `availableFuture.complete()`. Stream will not halt. 
   
   The only chance for the `notifyCompletion` callback not seeing the correct `availbleFuture` in this scenario is the dirty CPU cache which could be prevented by making the `availableFuture` volatile. Event if it sees an obsolete future. the following `anyOf` is still able to make `availableFuture` complete correctly.
   
   Anyway it's cheep to make the availableFuture `volatile`. In that way, the `Complete` will not able to see the obsolete future even in this scenario.
   
   I'm running the `MultipleInputBenchmark` test for hours. No stream hang-up is show up.
   <img width="297" alt="image" src="https://user-images.githubusercontent.com/2689362/151582292-01aacf62-2bff-4fb7-a160-ec40552e69cd.png">


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #18475: [FLINK-25728] Protential memory leaks in StreamMultipleInputProcessor

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #18475:
URL: https://github.com/apache/flink/pull/18475#issuecomment-1024637122


   > I would say without volatile notifyCompletion see a obsolete future could happen. But, before entry the notifyCompletion, the input's future is already completed. In the anyOf function, it will recheck whether the future of the input is completed, and thus triggered the following availableFuture.complete(). Stream will not halt.
   
   No. In the scenario I described above:
   
   >  If Complete action happens at the very very end (around to here arrow), 
   
   when `anyOf` is being executed the input's future will be still not yet completed.
   
   Please do not add test coverage for that, it would be unreliable anyway. 
   
   I don't know why, I'm not able to reproduce this issue locally. I believe it can happen, but I simply can not verify it. Anyway, as I wrote in the ticket, I had a chat with other committers and we decided to merge the fix for this issue without test coverage for the time being. We will add later some test coverage as part of https://issues.apache.org/jira/browse/FLINK-25869. I would propose to go with my fix, as it's a bit simpler and later we need to probably re-use the same code to fix https://issues.apache.org/jira/browse/FLINK-25827.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org