You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/24 11:16:16 UTC

[GitHub] [flink] pnowojski opened a new pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

pnowojski opened a new pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939
 
 
   Almost all interactions with the network stack are happening from the mailbox thread, except of ResultPartitionWriter#flushAll, which happens from OutputFlusher thread.
       
   This commit moves ResultPartitionWriter#flushAll to the mailbox thread (as a mailbox action), which further simplifies the threading model.
   
   ## Verifying this change
   
   This is covered by variety of existing unit, integration and end-to-end tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no)**
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145913516 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:327aba3c6cf204264965a6b9891d8ac8f883d20e Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:327aba3c6cf204264965a6b9891d8ac8f883d20e
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   * 327aba3c6cf204264965a6b9891d8ac8f883d20e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578090307
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5978349776ae9c87fef872887df47ee880ab12af (Wed Apr 15 11:38:30 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145913516 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:327aba3c6cf204264965a6b9891d8ac8f883d20e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145951800 TriggerType:PUSH TriggerID:327aba3c6cf204264965a6b9891d8ac8f883d20e
   Hash:327aba3c6cf204264965a6b9891d8ac8f883d20e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603 TriggerType:PUSH TriggerID:327aba3c6cf204264965a6b9891d8ac8f883d20e
   Hash:5978349776ae9c87fef872887df47ee880ab12af Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:5978349776ae9c87fef872887df47ee880ab12af
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   * 327aba3c6cf204264965a6b9891d8ac8f883d20e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145951800) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603) 
   * 5978349776ae9c87fef872887df47ee880ab12af UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145913516 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145913516 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:327aba3c6cf204264965a6b9891d8ac8f883d20e Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145951800 TriggerType:PUSH TriggerID:327aba3c6cf204264965a6b9891d8ac8f883d20e
   Hash:327aba3c6cf204264965a6b9891d8ac8f883d20e Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603 TriggerType:PUSH TriggerID:327aba3c6cf204264965a6b9891d8ac8f883d20e
   Hash:5978349776ae9c87fef872887df47ee880ab12af Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145957617 TriggerType:PUSH TriggerID:5978349776ae9c87fef872887df47ee880ab12af
   Hash:5978349776ae9c87fef872887df47ee880ab12af Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606 TriggerType:PUSH TriggerID:5978349776ae9c87fef872887df47ee880ab12af
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   * 327aba3c6cf204264965a6b9891d8ac8f883d20e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145951800) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603) 
   * 5978349776ae9c87fef872887df47ee880ab12af Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145957617) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145913516",
       "triggerID" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598",
       "triggerID" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "triggerType" : "PUSH"
     }, {
       "hash" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145951800",
       "triggerID" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603",
       "triggerID" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5978349776ae9c87fef872887df47ee880ab12af",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145957617",
       "triggerID" : "5978349776ae9c87fef872887df47ee880ab12af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5978349776ae9c87fef872887df47ee880ab12af",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606",
       "triggerID" : "5978349776ae9c87fef872887df47ee880ab12af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   * 327aba3c6cf204264965a6b9891d8ac8f883d20e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145951800) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603) 
   * 5978349776ae9c87fef872887df47ee880ab12af Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145957617) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/145913516 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598 TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/145913516) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370706740
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1451,7 +1485,9 @@ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 
 	private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
 			StreamConfig configuration,
-			Environment environment) {
+			Environment environment,
+			OutputFlusher.OutputFlushers outputFlushers,
 
 Review comment:
   nit: this method could create immutable flushers and return them, instead of receiving and passing
   through the call chain. This would make it easier to reason about it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578090307
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 94f6f5db651232f2e73b8088addd120df1be7c93 (Fri Jan 24 11:18:37 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370714527
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/OutputFlusher.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+public class OutputFlusher extends Thread {
+	/** Default name for the output flush thread, if no name with a task reference is given. */
+	@VisibleForTesting
+	public static final String OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	private static final Logger LOG = LoggerFactory.getLogger(OutputFlusher.class);
+
+	private final RecordWriter<?> recordWriter;
+	private final long timeout;
+	private final MailboxExecutor mailboxExecutor;
+
+	private volatile boolean running = true;
+
+	public OutputFlusher(
+			RecordWriter<?> recordWriter,
+			String taskName,
+			long timeout,
+			MailboxExecutor mailboxExecutor) {
+		super(OUTPUT_FLUSH_THREAD_NAME + " for " + taskName);
+		this.recordWriter = recordWriter;
+		this.timeout = timeout;
+		this.mailboxExecutor = mailboxExecutor;
+		setDaemon(true);
 
 Review comment:
   `setDaemon(true);` doesn't make sense for the flushers created in `StreamTask`: we wait for them in `OutputFlushers.close()`.
   So it's safer to not set this flag for that case (for other cases pass through constructor).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370703274
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/OutputFlusher.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+public class OutputFlusher extends Thread {
+	/** Default name for the output flush thread, if no name with a task reference is given. */
+	@VisibleForTesting
+	public static final String OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	private static final Logger LOG = LoggerFactory.getLogger(OutputFlusher.class);
+
+	private final RecordWriter<?> recordWriter;
+	private final long timeout;
+	private final MailboxExecutor mailboxExecutor;
+
+	private volatile boolean running = true;
+
+	public OutputFlusher(
+			RecordWriter<?> recordWriter,
+			String taskName,
+			long timeout,
+			MailboxExecutor mailboxExecutor) {
+		super(OUTPUT_FLUSH_THREAD_NAME + " for " + taskName);
+		this.recordWriter = recordWriter;
+		this.timeout = timeout;
+		this.mailboxExecutor = mailboxExecutor;
+		setDaemon(true);
+	}
+
+	public void terminate() {
+		running = false;
+		interrupt();
+	}
+
+	@Override
+	public void run() {
+		try {
+			Future<?> future = null;
+
+			while (running) {
+				try {
+					if (future != null) {
+						future.get();
+					}
+					Thread.sleep(timeout);
+				} catch (InterruptedException e) {
+					// propagate this if we are still running, because it should not happen
+					// in that case
+					if (running) {
+						throw new Exception(e);
+					}
+				}
+
+				recordWriter.flushAll();
+			}
+		} catch (Throwable t) {
+			LOG.error("An exception happened while flushing the outputs", t);
+			mailboxExecutor.execute(
+				() -> {
+					throw new FlinkException("OutputFlusher thread has failed", t);
+				},
+				"OutputFlusher");
+		}
+	}
+
+	/**
+	 * Closeable collection of {@link OutputFlusher}.
+	 */
+	public static class OutputFlushers implements AutoCloseable {
+		private final ArrayList<OutputFlusher> outputFlushers = new ArrayList<>();
+
+		public void addOutputFlusher(OutputFlusher outputFlusher) {
+			outputFlushers.add(outputFlusher);
 
 Review comment:
   nit: adding flusher shouldn't be allowed after close was called.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370702676
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/OutputFlusher.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+public class OutputFlusher extends Thread {
+	/** Default name for the output flush thread, if no name with a task reference is given. */
+	@VisibleForTesting
+	public static final String OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	private static final Logger LOG = LoggerFactory.getLogger(OutputFlusher.class);
+
+	private final RecordWriter<?> recordWriter;
+	private final long timeout;
+	private final MailboxExecutor mailboxExecutor;
+
+	private volatile boolean running = true;
+
+	public OutputFlusher(
+			RecordWriter<?> recordWriter,
+			String taskName,
+			long timeout,
+			MailboxExecutor mailboxExecutor) {
+		super(OUTPUT_FLUSH_THREAD_NAME + " for " + taskName);
+		this.recordWriter = recordWriter;
+		this.timeout = timeout;
+		this.mailboxExecutor = mailboxExecutor;
+		setDaemon(true);
+	}
+
+	public void terminate() {
+		running = false;
+		interrupt();
+	}
+
+	@Override
+	public void run() {
+		try {
+			Future<?> future = null;
+
+			while (running) {
+				try {
+					if (future != null) {
+						future.get();
+					}
+					Thread.sleep(timeout);
+				} catch (InterruptedException e) {
+					// propagate this if we are still running, because it should not happen
+					// in that case
+					if (running) {
+						throw new Exception(e);
+					}
+				}
+
+				recordWriter.flushAll();
+			}
+		} catch (Throwable t) {
+			LOG.error("An exception happened while flushing the outputs", t);
+			mailboxExecutor.execute(
+				() -> {
+					throw new FlinkException("OutputFlusher thread has failed", t);
+				},
+				"OutputFlusher");
+		}
+	}
+
+	/**
+	 * Closeable collection of {@link OutputFlusher}.
+	 */
+	public static class OutputFlushers implements AutoCloseable {
+		private final ArrayList<OutputFlusher> outputFlushers = new ArrayList<>();
+
+		public void addOutputFlusher(OutputFlusher outputFlusher) {
+			outputFlushers.add(outputFlusher);
+		}
+
+		public void close() {
+			for (OutputFlusher outputFlusher : outputFlushers) {
+				outputFlusher.terminate();
+				try {
+					outputFlusher.join();
 
 Review comment:
   nit: it would be faster to 1st terminate all flushers, then join

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370694018
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/OutputFlusher.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+public class OutputFlusher extends Thread {
+	/** Default name for the output flush thread, if no name with a task reference is given. */
+	@VisibleForTesting
+	public static final String OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	private static final Logger LOG = LoggerFactory.getLogger(OutputFlusher.class);
+
+	private final RecordWriter<?> recordWriter;
+	private final long timeout;
+	private final MailboxExecutor mailboxExecutor;
+
+	private volatile boolean running = true;
+
+	public OutputFlusher(
+			RecordWriter<?> recordWriter,
+			String taskName,
+			long timeout,
+			MailboxExecutor mailboxExecutor) {
+		super(OUTPUT_FLUSH_THREAD_NAME + " for " + taskName);
+		this.recordWriter = recordWriter;
+		this.timeout = timeout;
+		this.mailboxExecutor = mailboxExecutor;
+		setDaemon(true);
+	}
+
+	public void terminate() {
+		running = false;
+		interrupt();
+	}
+
+	@Override
+	public void run() {
+		try {
+			Future<?> future = null;
+
+			while (running) {
+				try {
+					if (future != null) {
 
 Review comment:
   Isn't it always false?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370711678
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1492,13 +1532,27 @@ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 
 		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
 			.setChannelSelector(outputPartitioner)
-			.setTimeout(bufferTimeout)
-			.setTaskName(taskName)
+			.setFlushAlways(bufferTimeout == 0)
 			.build(bufferWriter);
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
+
+		if (bufferTimeout > 0) {
+			outputFlushers.addOutputFlusher(startOutputFlusher(output, taskName, bufferTimeout, mailboxExecutor));
+		}
 		return output;
 	}
 
+	private static OutputFlusher startOutputFlusher(
+			RecordWriter<?> output,
+			String taskName,
+			long bufferTimeout,
+			MailboxExecutor mailboxExecutor) {
+		Preconditions.checkArgument(bufferTimeout >= -1);
 
 Review comment:
   This should be in `OutputFlusher` because it makes sense for any `OutputFlusher`.
   Also, values -1, 0 don't make sense (as well as very small number - everything under 10?)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370688709
 
 

 ##########
 File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/LongRecordWriterThread.java
 ##########
 @@ -63,46 +73,65 @@ public synchronized void shutdown() {
 	 * @param records
 	 * 		number of records to send
 	 */
-	public synchronized void setRecordsToSend(long records) {
-		checkState(!recordsToSend.isDone());
-		recordsToSend.complete(records);
+	public void setRecordsToSend(long records) {
+		mailboxExecutor.execute(() -> resumeAndSetRecordsToSend(records), "resumeAndSetRecordsToSend");
 
 Review comment:
   Is the check missing now? 
   (multiple calls here overwrite previous values and potentially lead to wrong results)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145913516",
       "triggerID" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4598",
       "triggerID" : "dd110c7f3f82e14edc384300e988e1b132cef614",
       "triggerType" : "PUSH"
     }, {
       "hash" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145951800",
       "triggerID" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4603",
       "triggerID" : "327aba3c6cf204264965a6b9891d8ac8f883d20e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5978349776ae9c87fef872887df47ee880ab12af",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/flink-ci/flink/builds/145957617",
       "triggerID" : "5978349776ae9c87fef872887df47ee880ab12af",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5978349776ae9c87fef872887df47ee880ab12af",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606",
       "triggerID" : "5978349776ae9c87fef872887df47ee880ab12af",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5978349776ae9c87fef872887df47ee880ab12af Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145957617) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4606) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370711678
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1492,13 +1532,27 @@ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 
 		RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output = new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>()
 			.setChannelSelector(outputPartitioner)
-			.setTimeout(bufferTimeout)
-			.setTaskName(taskName)
+			.setFlushAlways(bufferTimeout == 0)
 			.build(bufferWriter);
 		output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
+
+		if (bufferTimeout > 0) {
+			outputFlushers.addOutputFlusher(startOutputFlusher(output, taskName, bufferTimeout, mailboxExecutor));
+		}
 		return output;
 	}
 
+	private static OutputFlusher startOutputFlusher(
+			RecordWriter<?> output,
+			String taskName,
+			long bufferTimeout,
+			MailboxExecutor mailboxExecutor) {
+		Preconditions.checkArgument(bufferTimeout >= -1);
 
 Review comment:
   This should be in `OutputFlusher` because it makes sense for any `OutputFlusher`.
   Also, values -1, 0 and maybe under 10 don't make sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#discussion_r370714527
 
 

 ##########
 File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/OutputFlusher.java
 ##########
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+
+/**
+ * A dedicated thread that periodically flushes the output buffers, to set upper latency bounds.
+ *
+ * <p>The thread is daemonic, because it is only a utility thread.
+ */
+public class OutputFlusher extends Thread {
+	/** Default name for the output flush thread, if no name with a task reference is given. */
+	@VisibleForTesting
+	public static final String OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
+
+	private static final Logger LOG = LoggerFactory.getLogger(OutputFlusher.class);
+
+	private final RecordWriter<?> recordWriter;
+	private final long timeout;
+	private final MailboxExecutor mailboxExecutor;
+
+	private volatile boolean running = true;
+
+	public OutputFlusher(
+			RecordWriter<?> recordWriter,
+			String taskName,
+			long timeout,
+			MailboxExecutor mailboxExecutor) {
+		super(OUTPUT_FLUSH_THREAD_NAME + " for " + taskName);
+		this.recordWriter = recordWriter;
+		this.timeout = timeout;
+		this.mailboxExecutor = mailboxExecutor;
+		setDaemon(true);
 
 Review comment:
   This doesn't make sense for the flushers created in `StreamTask`: we wait for them in `OutputFlushers.close()`.
   So it's safer to not set this flag for that case (for other cases pass through constructor).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10939: [FLINK-15750][network] Moving output flushing to the mailbox
URL: https://github.com/apache/flink/pull/10939#issuecomment-578098092
 
 
   <!--
   Meta data
   Hash:dd110c7f3f82e14edc384300e988e1b132cef614 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:dd110c7f3f82e14edc384300e988e1b132cef614
   -->
   ## CI report:
   
   * dd110c7f3f82e14edc384300e988e1b132cef614 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services