You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/08 15:38:51 UTC

[GitHub] [flink] akalash opened a new pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

akalash opened a new pull request #17440:
URL: https://github.com/apache/flink/pull/17440


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *This PR fixes NPE which happens when we try to use channel context inside of ClientHandler before the channel was activated*
   
   
   ## Brief change log
   
     - *Wait for the channel activation before creating partition request client*
     - *Ignore the exception during the throughput calculation in order to avoid the failure of the further calculation*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added test for CreditBasedPartitionRequestClientHandler*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-946787559


   @dawidwys, thanks for the idea. I have checked it and it looks like it should work without any problem and this implementation looks more correct than current one. So let's wait for tests for the last commits and then you can take a look at it again. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732608314



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -240,4 +244,57 @@ private void checkNotClosed() throws IOException {
                     String.format("Channel to '%s' closed.", remoteAddr), localAddr);
         }
     }
+
+    static class AddCreditMessage extends ClientOutboundMessage {

Review comment:
       No any reason. All message class here can be private




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871) 
   * 7d56eb7d002b7bf5c268b98e12a6e4879cd68216 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
pnowojski commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-945645624


   Most likely we added `notifyNewBufferSize` to the `ChannelHandler` to follow the same path as `notifyCreditAvailable`. At least me myself, have not given much thought whether it should actually be implemented how you are proposing. Maybe you are right @dawidwys . 
   
   I don't know why `notifyCreditAvailable` was added the way it was added. Maybe that was also a wrong place to add it.
   
   I haven't read the full thread, so I'm not sure. If we had changed the way you are proposing @dawidwys , would it solve/help solve the problems in this ticket?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732625408



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
 
     @Override
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-        clientHandler.notifyCreditAvailable(inputChannel);
+        sendToChannel(new AddCreditMessage(inputChannel));
     }
 
     @Override
     public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
-        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+        sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
     }
 
     @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
-        clientHandler.resumeConsumption(inputChannel);
+        sendToChannel(new ResumeConsumptionMessage(inputChannel));
     }
 
     @Override
     public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
-        clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
+        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
+    }
+
+    private void sendToChannel(ClientOutboundMessage message) {
+        tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));

Review comment:
       I am fine merging it as is. However, I am curious myself if it was/is necessary (I could not find an answer, but I have not searched for long). But, I am not too familiar with how netty works.
   
   BTW, I've just looked into it again and if I go into: `ChannelPipeline#fireUserEventTriggered` -> `DefaultPipeline#fireUserEventTriggered` -> `AbstractChannelHandlerContext.invokeUserEventTriggered(this.head, event);` I can see a code like this:
   
   ```
       static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
           ObjectUtil.checkNotNull(event, "event");
           EventExecutor executor = next.executor();
           if (executor.inEventLoop()) {
               next.invokeUserEventTriggered(event);
           } else {
               executor.execute(new Runnable() {
                   public void run() {
                       next.invokeUserEventTriggered(event);
                   }
               });
           }
   
       }
   ```
   
   which would indicate submitting it explicitly from the `eventLoop` is unnecessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871) 
   * 7d56eb7d002b7bf5c268b98e12a6e4879cd68216 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 16b451eba79ef7015e19adf6f0eb7c8b39c70715 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189) 
   * 658994fafd3947a2a9a49fa91d3c7e4fce3e52b5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 658994fafd3947a2a9a49fa91d3c7e4fce3e52b5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227) 
   * 4be189160a729c2814f3e2400300632d97e76470 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-941031617


   I found the reason why the exception was swallowed. Please see: https://issues.apache.org/jira/browse/FLINK-24515.
   
   I think we should not swallow exceptions if debloating fails. Therefore I'd remove the `try/catch` block and replace the `submit` with `execute` in `org.apache.flink.streaming.runtime.tasks.StreamTask#scheduleBufferDebloater`:
   
   ```
       private void scheduleBufferDebloater() {
           // See https://issues.apache.org/jira/browse/FLINK-23560
           // If there are no input gates, there is no point of calculating the throughput and running
           // the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
           // lock, enqueuing even a single mailbox action can cause performance regression. This is
           // especially visible in batch, with disabled checkpointing and no processing time timers.
           if (getEnvironment().getAllInputGates().length == 0) {
               return;
           }
           systemTimerService.registerTimer(
                   systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
                   timestamp ->
                           mainMailboxExecutor.execute(
                                   () -> {
                                       debloat();
                                       scheduleBufferDebloater();
                                   },
                                   "Buffer size recalculation"));
       }
   ```
   
   As for the `waitForActivation`, I need to think about it for a bit more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-941031617


   I found the reason why the exception was swallowed. Please see: https://issues.apache.org/jira/browse/FLINK-24515.
   
   I think we should not swallow exceptions if debloating fails. Therefore I'd remove the `try/catch` block.
   
   As for the `waitForActivation`, I need to think about it for a bit more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 658994fafd3947a2a9a49fa91d3c7e4fce3e52b5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-945739757


   @dawidwys, I will try to implement it this way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4be189160a729c2814f3e2400300632d97e76470 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286) 
   * 3616cbfa7a3d536f132a2fb2f8a758974cbcfef0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-943278706


   So I spent some time looking into the `waitForChannelActivation` thingy. I am a bit concern if the sync waiting does not prolong the startup too much. If I understand the code correctly, we will sequentially wait for each channel to become active before requesting a partition on another channel. 
   
   As the initial error shows it may take some time. I was rather thinking if we could return the future itself (I know we would've to pass it through a couple of layers). Then we could start buffer debloating once a future for all channels activation completes.
   Something like:
   
   ```
           CompletableFuture.allOf(
                           Arrays.stream(environment.getAllInputGates())
                                   .map(InputGate::getAllChannelsActivatedFuture)
                                   .toArray(CompletableFuture[]::new))
                   .whenComplete(
                           (r, ex) -> {
                               scheduleBufferDebloater();
                           });
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732472566



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -240,4 +244,57 @@ private void checkNotClosed() throws IOException {
                     String.format("Channel to '%s' closed.", remoteAddr), localAddr);
         }
     }
+
+    static class AddCreditMessage extends ClientOutboundMessage {

Review comment:
       Any reason for those classes to be in the default scope instead of `private`?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ClientOutboundMessage.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import javax.annotation.Nullable;
+
+/** Abstract class for representing the output message. */
+public abstract class ClientOutboundMessage {

Review comment:
       Any reason for this class to be `public` instead of the default scope?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25297",
       "triggerID" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3616cbfa7a3d536f132a2fb2f8a758974cbcfef0 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25297) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r733419066



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
 
     @Override
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-        clientHandler.notifyCreditAvailable(inputChannel);
+        sendToChannel(new AddCreditMessage(inputChannel));
     }
 
     @Override
     public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
-        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+        sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
     }
 
     @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
-        clientHandler.resumeConsumption(inputChannel);
+        sendToChannel(new ResumeConsumptionMessage(inputChannel));
     }
 
     @Override
     public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
-        clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
+        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
+    }
+
+    private void sendToChannel(ClientOutboundMessage message) {
+        tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));

Review comment:
       Unfortunatelly, it is not possible to get rid of `tcpChannel.eventLoop().execute` because `executor.inEventLoop()` is always `true` since this call happens in the network thread but we still don't want to block the current execution and we want to send the massage in the other thread(or in the same thread but later).

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
 
     @Override
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-        clientHandler.notifyCreditAvailable(inputChannel);
+        sendToChannel(new AddCreditMessage(inputChannel));
     }
 
     @Override
     public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
-        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+        sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
     }
 
     @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
-        clientHandler.resumeConsumption(inputChannel);
+        sendToChannel(new ResumeConsumptionMessage(inputChannel));
     }
 
     @Override
     public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
-        clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
+        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
+    }
+
+    private void sendToChannel(ClientOutboundMessage message) {
+        tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));

Review comment:
       Actually, I can be wrong here and this problem can be related only to the test. But right now, I don't ready to answer more precisely.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-945691431


   I understand `notifyNewBufferSize`, `resumeConsumption` and `acknowledgeAllRecordsProcessed` was added that way, because of how `notifyCreditAvailable` was introduced.
   
   I think it was added inside of the `NetworkClienHandler` in order to have two parallel implementations for credit-based and the legacy flow control and there were already two classes for those. Overall, I think that was an unfortunate choice. I'd recommend trying out what I am suggesting. It should fix the issue of the thread, because we would use netty mechanisms exclusively and we would not depend that the activation message comes first. @akalash would you like to try it out?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-941031617


   I found the reason why the exception was swallowed. Please see: https://issues.apache.org/jira/browse/FLINK-24515.
   
   I think we should not swallow exceptions if debloating fails. Therefore I'd remove the `try/catch` block and replace the `submit` with `execute` in `org.apache.flink.streaming.runtime.tasks.StreamTask#scheduleBufferDebloater`:
   
   ```
       private void scheduleBufferDebloater() {
           // See https://issues.apache.org/jira/browse/FLINK-23560
           // If there are no input gates, there is no point of calculating the throughput and running
           // the debloater. At the same time, for SourceStreamTask using legacy sources and checkpoint
           // lock, enqueuing even a single mailbox action can cause performance regression. This is
           // especially visible in batch, with disabled checkpointing and no processing time timers.
           if (getEnvironment().getAllInputGates().length == 0) {
               return;
           }
           systemTimerService.registerTimer(
                   systemTimerService.getCurrentProcessingTime() + bufferDebloatPeriod,
                   timestamp ->
                           mainMailboxExecutor.execute(
                                   () -> {
                                       debloat();
                                       scheduleBufferDebloater();
                                   },
                                   "Buffer size recalculation"));
       }
   ```
   
   As for the `waitForActivation`, I need to think about it for a bit more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-940200558


   > Do we really just stop debloating or do we trigger a failover?
   
   As I can see we just stopping debloating and no failover would be triggered
   
   > Shouldn't the exception from the mailbox processor fail the task?
   
   I was also surprised but it looks like we just ignore this error(it is exactly what should be figured out later).
   
   > Why does it happen only for announcing the new buffer size? 
   
   if we have input data already then the this NPE is impossible, it is exactly what happens when we call other methods except for the new buffer size. So everything except the announcement of the buffer size is a reaction to the input data.
   
   > Shouldn't we have the same issue for all other methods of CreditBasedPartitionRequestClientHandler such as e.g. notifyCreditAvailable?
   
   As you can see I made this NPE impossible by waiting for channel initialization on the start. So everything should work well now
   
   > Is it an error if we continue processing? IMO, at best it should be warn, I am event thinking if it shouldn't be just a debug. What are users supposed to do with the line in logs?
   
   I don't really sure that we need this try/catch block at all. My main fix is about waiting for the initialization but these changes I made just in case, but perhaps it is much better to understand the flink behavior when we have exceptions inside of the action.( But in general, I agree that `debug`  suits here more)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r728809401



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -788,9 +788,14 @@ private void scheduleBufferDebloater() {
 
     @VisibleForTesting
     void debloat() {
-        long throughput = throughputCalculator.calculateThroughput();
-        if (bufferDebloater != null) {
-            bufferDebloater.recalculateBufferSize(throughput);
+        try {
+            long throughput = throughputCalculator.calculateThroughput();
+            if (bufferDebloater != null) {
+                bufferDebloater.recalculateBufferSize(throughput);
+            }
+            // Don't fail the further calculation if the current one fails.
+        } catch (Throwable ex) {
+            LOG.error("Throughput calculation fails, try next time.", ex);

Review comment:
       I have removed try-catch block




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-946436144


   Looking at the test results it seems to be working just fine (e2e failure unrelated). How do you feel about the solution @akalash ?
   
   If you see no obstacles, I'd suggest proceeding with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r725972098



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -788,9 +788,14 @@ private void scheduleBufferDebloater() {
 
     @VisibleForTesting
     void debloat() {
-        long throughput = throughputCalculator.calculateThroughput();
-        if (bufferDebloater != null) {
-            bufferDebloater.recalculateBufferSize(throughput);
+        try {
+            long throughput = throughputCalculator.calculateThroughput();
+            if (bufferDebloater != null) {
+                bufferDebloater.recalculateBufferSize(throughput);
+            }
+            // Don't fail the further calculation if the current one fails.
+        } catch (Throwable ex) {
+            LOG.error("Throughput calculation fails, try next time.", ex);

Review comment:
       Is it an error if we continue processing? IMO, at best it should be `warn`, I am event thinking if it shouldn't be just a `debug`. What are users supposed to do with the line in logs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-939886457


   Another question. Why does it happen only for announcing the new buffer size? Shouldn't we have the same issue for all other methods of `CreditBasedPartitionRequestClientHandler` such as e.g. `notifyCreditAvailable`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-945629976


   Another idea to throw into the mix:
   
   Why do we even have the methods like: `notifyCreditAvailable`, `notifyNewBufferSize`, `resumeConsumption` and `acknowledgeAllRecordsProcessed` in the `NetworkClientHandler`. It seems to me those methods go against the purpose of `ChannelHandler` in netty. As far as I understand them, they are supposed to be reactive components that act on a certain event happening in the network stack. That is not the case with the aforementioned methods.
   
   On the other hand we have the `NettyPartitionRequestClient`, which is an active client of the network stack, we can e.g. `requestSubpartition` or `sendTaskEvent`. I am wondering why don't we implement the methods like `notifyNewBufferSize` there?
   Something like:
   ```
       @Override
       public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
           tcpChannel
                   .pipeline()
                   .fireUserEventTriggered(
                           new CreditBasedPartitionRequestClientHandler
                                   .NewBufferSizeMessage(inputChannel, bufferSize));
       }
   ```
   or (I am not sure about the threading model)
   
   ```
           tcpChannel
                   .eventLoop()
                   .execute(
                           () -> {
                               tcpChannel
                                       .pipeline()
                                       .fireUserEventTriggered(
                                               new CreditBasedPartitionRequestClientHandler
                                                       .NewBufferSizeMessage(
                                                       inputChannel, bufferSize));
                           });
   ```
   I am sorry, I do not have much expertise on netty, so would be really nice to hear from @pnowojski on the idea above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 658994fafd3947a2a9a49fa91d3c7e4fce3e52b5 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227) 
   * 4be189160a729c2814f3e2400300632d97e76470 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d56eb7d002b7bf5c268b98e12a6e4879cd68216 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042) 
   * 16b451eba79ef7015e19adf6f0eb7c8b39c70715 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732607351



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/ClientOutboundMessage.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import javax.annotation.Nullable;
+
+/** Abstract class for representing the output message. */
+public abstract class ClientOutboundMessage {

Review comment:
       No any reason. It can be default scope




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4be189160a729c2814f3e2400300632d97e76470 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d56eb7d002b7bf5c268b98e12a6e4879cd68216 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys merged pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys merged pull request #17440:
URL: https://github.com/apache/flink/pull/17440


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 16b451eba79ef7015e19adf6f0eb7c8b39c70715 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732474755



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
 
     @Override
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-        clientHandler.notifyCreditAvailable(inputChannel);
+        sendToChannel(new AddCreditMessage(inputChannel));
     }
 
     @Override
     public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
-        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+        sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
     }
 
     @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
-        clientHandler.resumeConsumption(inputChannel);
+        sendToChannel(new ResumeConsumptionMessage(inputChannel));
     }
 
     @Override
     public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
-        clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
+        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
+    }
+
+    private void sendToChannel(ClientOutboundMessage message) {
+        tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));

Review comment:
       Did you investigate if we need to submit it through the `eventLoop` or would it be enough to call `tcpChannel.pipeline().fireUserEventTriggered(message)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #17440:
URL: https://github.com/apache/flink/pull/17440#discussion_r732611368



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
##########
@@ -195,22 +195,26 @@ public void operationComplete(ChannelFuture future) throws Exception {
 
     @Override
     public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
-        clientHandler.notifyCreditAvailable(inputChannel);
+        sendToChannel(new AddCreditMessage(inputChannel));
     }
 
     @Override
     public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
-        clientHandler.notifyNewBufferSize(inputChannel, bufferSize);
+        sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
     }
 
     @Override
     public void resumeConsumption(RemoteInputChannel inputChannel) {
-        clientHandler.resumeConsumption(inputChannel);
+        sendToChannel(new ResumeConsumptionMessage(inputChannel));
     }
 
     @Override
     public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
-        clientHandler.acknowledgeAllRecordsProcessed(inputChannel);
+        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
+    }
+
+    private void sendToChannel(ClientOutboundMessage message) {
+        tcpChannel.eventLoop().execute(() -> tcpChannel.pipeline().fireUserEventTriggered(message));

Review comment:
       Yes, I did. Before these changes, we always submitted it through the `evenLoop` so I don't see a reason why we should change that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 16b451eba79ef7015e19adf6f0eb7c8b39c70715 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189) 
   * 658994fafd3947a2a9a49fa91d3c7e4fce3e52b5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     }, {
       "hash" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25227",
       "triggerID" : "658994fafd3947a2a9a49fa91d3c7e4fce3e52b5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4be189160a729c2814f3e2400300632d97e76470",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286",
       "triggerID" : "4be189160a729c2814f3e2400300632d97e76470",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25297",
       "triggerID" : "3616cbfa7a3d536f132a2fb2f8a758974cbcfef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4be189160a729c2814f3e2400300632d97e76470 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25286) 
   * 3616cbfa7a3d536f132a2fb2f8a758974cbcfef0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25297) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938741489


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9 (Fri Oct 08 15:42:04 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-938816662


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24871",
       "triggerID" : "37cf758d9eaf3ee1a1f6342714abd14ae8d80ea9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042",
       "triggerID" : "7d56eb7d002b7bf5c268b98e12a6e4879cd68216",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189",
       "triggerID" : "16b451eba79ef7015e19adf6f0eb7c8b39c70715",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d56eb7d002b7bf5c268b98e12a6e4879cd68216 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25042) 
   * 16b451eba79ef7015e19adf6f0eb7c8b39c70715 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25189) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on pull request #17440: [FLINK-24468][runtime] Wait for the channel activation before creating partition request client

Posted by GitBox <gi...@apache.org>.
akalash commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-943469088


   I don't really sure about this proposal because we already create the client via clientBuilder, and then, by some reason, we need to wait on a future before the call of the one particular method. It looks pretty fragile because if tomorrow we will add one more method like that we will need to duplicate this waiting.
   In my opinion, the problem is returning the unprepared client. I mean, in fact, when we create the PartitionRequestClient we can not use it as is but we should wait while it will be ready which seems wrong. At the same time, I agree with you that at least we can safely call requestPartition method without waiting for channel initialization. 
   So at least right now, I see two solutions:
   
   - Separating PartitionRequestClient into two classes one of them for requestPartiton while another for other things(this change looks pretty expensive and I don't think that it makes sense now)
   - Adding waiting for channel activation inside every methods that use the context:
   
   ```
   Context receiveContext() {
      If(ctx == null) {
        channelActivationFuture.get();
      }
     return ctx;
   }
   
   void announceBufferSize(int bufferSize) {
    Context ctx = receiveContext();
    ctx.....
   }
   ```
   Only one open question for this solution - how to handle the interruptedException which is not typical for this place right now.
   
   I will try to come up with other ideas.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org