You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 06:10:20 UTC

[GitHub] [flink] mas-chen opened a new pull request, #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

mas-chen opened a new pull request, #20215:
URL: https://github.com/apache/flink/pull/20215

   …id source
   
   ## What is the purpose of the change
   
   Hybrid Source gets into tight busy loop since availability future is marked completed and never refreshed.
   
   ## Brief change log
   
   - Introduce MultipleFuturesAvailabilityHelper to help manage availability future
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added basic unit test to verify that availability future is refreshed
   - Existing tests exercise that source continues to emit records properly
   
   I verified this fix on the latest 1.14 branch.
   
   Test setup: Hybrid Source setup (bounded read on short csv file and then switchover to unbounded read from Kafka)
   
   Test Jobs:
   1. Hybrid Source without fix
   2. Hybrid Source with fix
   3. Kafka Source 
   
   
   I have attached the flamegraphs to the jira ticket. Additionally, I confirmed that test job 2 and test job 3 CPU usage similar running in EKS (2% CPU usage) vs test job 1 (~80%). 
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no. This is not per-record but this is performance impacting
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945951057


##########
flink-connectors/flink-connector-base/pom.xml:
##########
@@ -37,7 +37,7 @@
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>

Review Comment:
   +1, I think it makes sense. There's potential for more for usages in future connectors to help with future management, no pun intended.
   
   The only dependencies of this class are in `flink-core` and the classes that depend on `MultipleFuturesAvailabilityHelper` are only in `flink-streaming-java`, and now `flink-connectors-base`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945977650


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   @mxm Thanks for taking a look!
   
   The problem isn't only that the future is not renewed after each completion of a source. It also happens because `whenComplete()` executes, the future is marked at complete and is never renewed (for the underlying reader to tell to the runtime to stop invoking pollNext()).
   
   I think it can be refactored in other means. Proxying the `currentReader.isAvailable()` in the Hybrid Source `isAvailable()` could also work since the underlying reader itself handles renewing the availability future. I'll take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20215:
URL: https://github.com/apache/flink/pull/20215#issuecomment-1178598120

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "d726a8b6ad3f72933a31de68eaf4a96b4b32865e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d726a8b6ad3f72933a31de68eaf4a96b4b32865e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d726a8b6ad3f72933a31de68eaf4a96b4b32865e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945871802


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   The above removed lines can be re-added if we  make sure to renew `availabilityFuture` after each completion, e.g. add the following after line 242:
   
   ```
                               availabilityFuture = new CompletableFuture<>();
   ``` 
   
   So this becomes:
   
   ```java
           currentReader
                   .isAvailable()
                   .whenComplete(
                           (result, ex) -> {
                               if (ex == null) {
                                   availabilityFuture.complete(result);
                               } else {
                                   availabilityFuture.completeExceptionally(ex);
                               }
                               availabilityFuture = new CompletableFuture<>();
                           });
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
 
     @Override
     public CompletableFuture<Void> isAvailable() {
-        return availabilityFuture;
+        availabilityHelper.resetToUnAvailable();
+        if (currentReader == null) {
+            return (CompletableFuture<Void>) availabilityHelper.getAvailableFuture();
+        } else {
+            Preconditions.checkArgument(
+                    availabilityHelper.getSize() == 1,
+                    "Availability helper is out of sync for current reader: %s",
+                    currentReader);
+            availabilityHelper.anyOf(0, currentReader.isAvailable());
+            return (CompletableFuture<Void>) availabilityHelper.getAvailableFuture();
+        }

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });
+        completeAndResetAvailabilityHelper();

Review Comment:
   ```suggestion
   ```



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -133,7 +132,17 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
 
     @Override
     public CompletableFuture<Void> isAvailable() {
-        return availabilityFuture;
+        availabilityHelper.resetToUnAvailable();
+        if (currentReader == null) {
+            return (CompletableFuture<Void>) availabilityHelper.getAvailableFuture();
+        } else {
+            Preconditions.checkArgument(
+                    availabilityHelper.getSize() == 1,
+                    "Availability helper is out of sync for current reader: %s",
+                    currentReader);
+            availabilityHelper.anyOf(0, currentReader.isAvailable());
+            return (CompletableFuture<Void>) availabilityHelper.getAvailableFuture();
+        }

Review Comment:
   Simply return `availabilityFuture`.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -58,8 +60,9 @@
     private int currentSourceIndex = -1;
     private boolean isFinalSource;
     private SourceReader<T, ? extends SourceSplit> currentReader;
-    private CompletableFuture<Void> availabilityFuture = new CompletableFuture<>();
     private List<HybridSourceSplit> restoredSplits = new ArrayList<>();
+    private MultipleFuturesAvailabilityHelper availabilityHelper =
+            new MultipleFuturesAvailabilityHelper(0);

Review Comment:
   I think we can keep using `CompletableFuture` directly with a few modifications. Please see the other diffs for how I think this could be done.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Note that this works also in case of switch events because the reader will be closed then which will complete its availability future.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });
+        completeAndResetAvailabilityHelper();

Review Comment:
   This logic won't be required anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947885185


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Thank you for elaborating. I was new to the code, apologies for not getting the switch case right. I was under the assumption that the underlying reader would complete its future on close when we switch. In that case synchronization wouldn't have been an issue. But that isn't the case.
   
   I had another look and came up with this which should do the trick: https://github.com/apache/flink/compare/master...mxm:flink:FLINK-27479?expand=1 It's conceptually close to your code but it avoids using the helper class which in my eyes makes it much easier to understand.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleFuturesAvailabilityHelper.java:
##########
@@ -57,13 +57,17 @@ public CompletableFuture<?> getAvailableFuture() {
         return availableFuture;
     }
 
+    public int getSize() {
+        return futuresToCombine.length;
+    }
+
     public void resetToUnAvailable() {
         if (availableFuture.isDone()) {
             availableFuture = new CompletableFuture<>();
         }
     }
 
-    private void notifyCompletion() {
+    public void notifyCompletion() {

Review Comment:
   Wouldn't changing the modifier here to `public` break the class contract, which is to only ever complete the `availableFuture` returned via `getAvailableFuture` when the combined futures inserted via `anyOf` complete?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   3. Complete the future when the current reader hits end of input
   4. Repeat process
   
   However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity).
   
   Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r960131874


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleFuturesAvailabilityHelper.java:
##########
@@ -57,13 +57,17 @@ public CompletableFuture<?> getAvailableFuture() {
         return availableFuture;
     }
 
+    public int getSize() {
+        return futuresToCombine.length;
+    }
+
     public void resetToUnAvailable() {
         if (availableFuture.isDone()) {
             availableFuture = new CompletableFuture<>();
         }
     }
 
-    private void notifyCompletion() {
+    public void notifyCompletion() {

Review Comment:
   That's a good point! We can pursue a solution that avoids using this class since we only really need to maintain 1 future at a time



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945977650


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   @mxm Thanks for taking a look!
   
   The problem isn't only that the future is not renewed after each completion of a source. It also because `whenComplete()` executes, the future is marked at complete and is never renewed (for the underlying reader to tell to the runtime to stop invoking pollNext()).
   
   I think it can be refactored in other means. Proxying the `currentReader.isAvailable()` could also work since the underlying reader itself handles renewing the availability future. I'll take a look



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   @mxm Thanks for taking a look!
   
   The problem isn't only that the future is not renewed after each completion of a source. It also happens because `whenComplete()` executes, the future is marked at complete and is never renewed (for the underlying reader to tell to the runtime to stop invoking pollNext()).
   
   I think it can be refactored in other means. Proxying the `currentReader.isAvailable()` could also work since the underlying reader itself handles renewing the availability future. I'll take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945951057


##########
flink-connectors/flink-connector-base/pom.xml:
##########
@@ -37,7 +37,7 @@
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>

Review Comment:
   +1, I think it makes sense. There's potential for more for usages in future connectors to help with future management, no pun intended.
   
   The only dependencies of this class are in `flink-core` and the classes that depend on `MultipleFuturesAvailabilityHelper` are only in `flink-streaming-java`, and now `flink-connectors-base`. So there would not be additional dependency changes, since all the related modules already depend on `flink-core`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   
   What is the process that renews the renewed future in step 2? Such a process would need to be recursively ran outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces this additional complexity)
   
   Also in the code snippet above, I think there could be a need for synchronization on the future, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r960134253


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Makes sense. We only really deal with one future here at a time, so I agree that the helper class adds more complexity than necessary. I worked on integrating your idea and enhanced the unit test. 
   
   However, I did not include lines 98-102 (https://github.com/apache/flink/compare/master...mxm:flink:FLINK-27479?expand=1#diff-2a9fc4178aac9e00bad9d172c1011d50a2ecd17ada6ca6260db40728bb978723R98-R102). The underlying reader should not complete it's underlying future if it is reporting 'END_OF_INPUT'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   3. Complete the future when the current reader hits end of input
   4. Repeat process with new reader
   
   However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity).
   
   Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency. It's better to let the underlying reader signal availability
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   3. Complete the future when the current reader hits end of input.
   4. Repeat process with new reader.
   
   However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity).
   
   Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency. It's better to let the underlying reader signal availability.
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945977650


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Thanks for taking a look!
   
   The problem isn't only that the future is not renewed after each completion. It also because `whenComplete()` executes, the future is marked at complete and is never renewed (for the underlying reader to tell to the runtime to stop invoking pollNext()).
   
   I think it can be refactored in other means. Proxying the `currentReader.isAvailable()` could also work since the underlying reader itself handles renewing the availability future. I'll take a look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   
   What is the process that renews the renewed future in step 2? Such a process would need to be recursively ran outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces this MultipleFuturesAvailabilityHelper to coordinate this additional complexity)
   
   Also in the code snippet above, I think there could be a need for synchronization on the future, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   
   What is the process that renews the renewed future in step 2? Such a process would need to be recursively ran outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity)
   
   Also in the code snippet above, I think there could be a need for synchronization on the future, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945951057


##########
flink-connectors/flink-connector-base/pom.xml:
##########
@@ -37,7 +37,7 @@
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>

Review Comment:
   +1, I think it makes sense. There's potential for more for usages in future connectors to help with future management, no pun intended.
   
   The only dependencies of this class are in `flink-core` and the classes that depend on `MultipleFuturesAvailabilityHelper` are only in `flink-streaming-java`, and now `flink-connectors-base`. So there would not be additional dependency changes assuming the move to `flink-core`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945951057


##########
flink-connectors/flink-connector-base/pom.xml:
##########
@@ -37,7 +37,7 @@
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>

Review Comment:
   +1, I think it makes sense. There's potential for more for usages in future connectors to help with future management, no pun intended.
   
   The only dependencies of this class are in `flink-core` and the classes that depend on `MultipleFuturesAvailabilityHelper` are only in `flink-streaming-java`, and now `flink-connectors-base`. So there would not be additional dependency changes
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mxm commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mxm commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r946579671


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   Why wouldn't the above solution work (alongside with the other proposed changes)? The future would always be renewed once completed. So the runtime would stop polling for any new calls of `isAvailable()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r960131874


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/MultipleFuturesAvailabilityHelper.java:
##########
@@ -57,13 +57,17 @@ public CompletableFuture<?> getAvailableFuture() {
         return availableFuture;
     }
 
+    public int getSize() {
+        return futuresToCombine.length;
+    }
+
     public void resetToUnAvailable() {
         if (availableFuture.isDone()) {
             availableFuture = new CompletableFuture<>();
         }
     }
 
-    private void notifyCompletion() {
+    public void notifyCompletion() {

Review Comment:
   That's a good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   3. Complete the future when the current reader hits end of input
   4. Repeat process
   
   However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity)
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tweise commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
tweise commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r945367610


##########
flink-connectors/flink-connector-base/pom.xml:
##########
@@ -37,7 +37,7 @@
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-streaming-java</artifactId>

Review Comment:
   Should we move `MultipleFuturesAvailabilityHelper` to `flink-core` rather than changing the dependencies of this module in a rather significant way? @zentol WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on PR #20215:
URL: https://github.com/apache/flink/pull/20215#issuecomment-1261134281

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tweise merged pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
tweise merged PR #20215:
URL: https://github.com/apache/flink/pull/20215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   3. Complete the future when the current reader hits end of input
   4. Repeat process
   
   However, this doesn't solve the case where there is no end of input for the streaming source. And to handle this case, there needs to be a process running recursively outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity).
   
   Additionally, even in the bounded sources, this can create tight loop since data is not always available if the external system has extended latency. It's better to let the underlying reader signal availability
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] mas-chen commented on a diff in pull request #20215: [FLINK-27479] [Connectors / Common] introduce availability helper to manage future for hybr…

Posted by GitBox <gi...@apache.org>.
mas-chen commented on code in PR #20215:
URL: https://github.com/apache/flink/pull/20215#discussion_r947506841


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java:
##########
@@ -231,16 +236,7 @@ private void setCurrentReader(int index) {
         reader.start();
         currentSourceIndex = index;
         currentReader = reader;
-        currentReader
-                .isAvailable()
-                .whenComplete(
-                        (result, ex) -> {
-                            if (ex == null) {
-                                availabilityFuture.complete(result);
-                            } else {
-                                availabilityFuture.completeExceptionally(ex);
-                            }
-                        });

Review Comment:
   If I understand the process in the alternative solution:
   1. A new reader is set.
   2. When the reader future is done, renew the future.
   
   What is the process that renews the renewed future in step 2? Such a process would need to be recursively ran outside of this reader switch handling method. And repeatedly signaling completeness from the underlying reader and renewing again and again. (This is why PR introduces the MultipleFuturesAvailabilityHelper to coordinate this additional complexity)
   
   Also in the code snippet above, I think there could be a need for synchronization on the future variable, since `whenComplete()` would not be guaranteed to run on the task thread, unless we use `whenCompleteAsync()` and pass the task executor, which we don't have access to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org