You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/29 17:27:06 UTC

[GitHub] [flink] rkhachatryan opened a new pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

rkhachatryan opened a new pull request #14814:
URL: https://github.com/apache/flink/pull/14814


   ## What is the purpose of the change
   
   (1.11 backport)
   
   ```
   EndOfInput was used to handle any stopping of the job. When
   stopping with savepoint the input is not actually ended.
   This causes issues with some sinks (e.g. Iceberg).
   
   With this change, endInput is not call for stop-with-savepoint.
   
   To differentiate stop-with-savepoint from other cases
   only checkpoint (RPC/barriers) are considered and not network EOP.
   That's enough because EOP is only injected after the CP completion
   (i.e. when the downstream is also notified by sync savepoint by CP
   barriers).
   ```
   
   ## Verifying this change
   
   Added `SavepointITCase.testStopSavepointWithBoundedInput`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes** (`endInput` is not called anymore for stop-with-savepoint)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 123e752166344997c2c8ce3e3edd32f9d4328987 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769941674


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 71e3f697f43505aa7a0ec43c6f3a8f95d8067c47 (Fri May 28 08:16:53 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * b458c15ff328dbead0e2dfecd81dfeb859232e14 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811) 
   * 71e3f697f43505aa7a0ec43c6f3a8f95d8067c47 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 123e752166344997c2c8ce3e3edd32f9d4328987 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763) 
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ffa5eb22c3782576880dde2a07454e64b9a9f752 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755) 
   * d77d30aa8d4918513092e9e984cd03232bfd1d6f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cad0310547eafdfd2eee8dddb14813ce940d3abd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski merged pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
pnowojski merged pull request #14814:
URL: https://github.com/apache/flink/pull/14814


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d77d30aa8d4918513092e9e984cd03232bfd1d6f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762) 
   * 123e752166344997c2c8ce3e3edd32f9d4328987 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ffa5eb22c3782576880dde2a07454e64b9a9f752 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755) 
   * d77d30aa8d4918513092e9e984cd03232bfd1d6f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568848982



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -163,9 +163,18 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
                                             .isPresent()) {
                                 mailboxProcessor.reportThrowable(
                                         new CancelTaskException(sourceThreadThrowable));
-                            } else if (!isFinished && sourceThreadThrowable != null) {
+                            } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || wasStoppedExternally) {
+                                mailboxProcessor.allActionsCompleted();
                             } else {
+                                // this is a "true" end of input regardless of whether
+                                // stop-with-savepoint was issued or not
+                                synchronized (lock) {

Review comment:
       This can be either task thread or source thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12873",
       "triggerID" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * b458c15ff328dbead0e2dfecd81dfeb859232e14 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811) 
   * 71e3f697f43505aa7a0ec43c6f3a8f95d8067c47 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12873) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cad0310547eafdfd2eee8dddb14813ce940d3abd Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568846031



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Yes, I tried to avoid using `sleep` here but ended up with either a deadlock (because of chaining) or test failure (because  `BoundedPassThroughOperator#snapshotState` is too late).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * b458c15ff328dbead0e2dfecd81dfeb859232e14 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 53237d304da31ce8b2bdba5103192007122f23c7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682) 
   * ffa5eb22c3782576880dde2a07454e64b9a9f752 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568753925



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -184,13 +189,93 @@
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
+    @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
+    }

Review comment:
       I think this is a bit fishy, as it's encoding incorrect contract. `abortCheckpointOnBarrier ` can not happen after `triggerCheckpointOnBarrier` in this scenario.  Maybe it's better to drop this test?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -165,7 +165,14 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {

Review comment:
       `isFinished` is a very confusing name in this context (to differentiate whether to issue `endOfInput` or not).
   
   `wasStoppedExternally`? 

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Are you sure that this is the right way to test it? Why do we need this `sleep(500)` and why we can not use some latches instead? For example counting down latch in `BoundedPassThroughOperator#snapshotState`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 123e752166344997c2c8ce3e3edd32f9d4328987 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763) 
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * b458c15ff328dbead0e2dfecd81dfeb859232e14 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cad0310547eafdfd2eee8dddb14813ce940d3abd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673) 
   * 53237d304da31ce8b2bdba5103192007122f23c7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ffa5eb22c3782576880dde2a07454e64b9a9f752 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12811",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12873",
       "triggerID" : "71e3f697f43505aa7a0ec43c6f3a8f95d8067c47",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * 71e3f697f43505aa7a0ec43c6f3a8f95d8067c47 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12873) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 53237d304da31ce8b2bdba5103192007122f23c7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d77d30aa8d4918513092e9e984cd03232bfd1d6f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a change in pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568846031



##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Yes, I tried to avoid using `sleep` here but ended up with either a deadlock (because of chaining) or test failure (because  `BoundedPassThroughOperator#snapshotState` is too late).
   

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -163,9 +163,18 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
                                             .isPresent()) {
                                 mailboxProcessor.reportThrowable(
                                         new CancelTaskException(sourceThreadThrowable));
-                            } else if (!isFinished && sourceThreadThrowable != null) {
+                            } else if (!wasStoppedExternally && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || wasStoppedExternally) {
+                                mailboxProcessor.allActionsCompleted();
                             } else {
+                                // this is a "true" end of input regardless of whether
+                                // stop-with-savepoint was issued or not
+                                synchronized (lock) {

Review comment:
       This can be either task thread or source thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cad0310547eafdfd2eee8dddb14813ce940d3abd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cad0310547eafdfd2eee8dddb14813ce940d3abd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673) 
   * 53237d304da31ce8b2bdba5103192007122f23c7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c4c98aab569657323593a9e329ae36c7f7d3c2e1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b458c15ff328dbead0e2dfecd81dfeb859232e14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 123e752166344997c2c8ce3e3edd32f9d4328987 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12763) 
   * c4c98aab569657323593a9e329ae36c7f7d3c2e1 UNKNOWN
   * b458c15ff328dbead0e2dfecd81dfeb859232e14 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] pnowojski commented on a change in pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
pnowojski commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568753925



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -184,13 +189,93 @@
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
+    @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new RuntimeException()), true);
+    }

Review comment:
       I think this is a bit fishy, as it's encoding incorrect contract. `abortCheckpointOnBarrier ` can not happen after `triggerCheckpointOnBarrier` in this scenario.  Maybe it's better to drop this test?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -165,7 +165,14 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
                                         new CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != null) {
                                 mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {

Review comment:
       `isFinished` is a very confusing name in this context (to differentiate whether to issue `endOfInput` or not).
   
   `wasStoppedExternally`? 

##########
File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in cancelAllAndAwait
+                CompletableFuture<String> stop = client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Are you sure that this is the right way to test it? Why do we need this `sleep(500)` and why we can not use some latches instead? For example counting down latch in `BoundedPassThroughOperator#snapshotState`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12755",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762",
       "triggerID" : "d77d30aa8d4918513092e9e984cd03232bfd1d6f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "123e752166344997c2c8ce3e3edd32f9d4328987",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d77d30aa8d4918513092e9e984cd03232bfd1d6f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12762) 
   * 123e752166344997c2c8ce3e3edd32f9d4328987 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14814: (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769949284


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12673",
       "triggerID" : "cad0310547eafdfd2eee8dddb14813ce940d3abd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682",
       "triggerID" : "53237d304da31ce8b2bdba5103192007122f23c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ffa5eb22c3782576880dde2a07454e64b9a9f752",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 53237d304da31ce8b2bdba5103192007122f23c7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12682) 
   * ffa5eb22c3782576880dde2a07454e64b9a9f752 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14814: [WIP] (1.11) [FLINK-21132][runtime] Don't end input on stop with savepoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14814:
URL: https://github.com/apache/flink/pull/14814#issuecomment-769941674


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit cad0310547eafdfd2eee8dddb14813ce940d3abd (Fri Jan 29 17:29:00 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org