You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/09 09:19:29 UTC

[GitHub] [flink] zoltar9264 opened a new pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

zoltar9264 opened a new pull request #17443:
URL: https://github.com/apache/flink/pull/17443


   ## What is the purpose of the change
   
   This pull request support specify a separate savepoint timeout option via REST API and CLI, which is decribed in [FLINK-9465](https://issues.apache.org/jira/browse/FLINK-9465).
   
   
   ## Brief change log
   
     - *CheckpointCoordinator.CheckpointTriggerRequest* add savepointTimeout field,
     - *CheckpointCoordinator#createPendingCheckpoint()* add a "timeout" parameter, and use it as canceller trigger delay if it > 0,
     - *CheckpointCoordinator, RestfulGateway, JobMasterGateway,SchedulerNG,ClusterClient* add triggerSavepoint/stopWithSavepoint method with parameter "savepointTimeout"
     - *CliFrontend#savepoint(), CliFrontend#stop(), SavepointTriggerHandler, StopWithSavepointHandler* migrate to method with "savepointTimeout"
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   * 178f04c20bd675903e22638afe425fe9e38f426c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941) 
   * d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   * 178f04c20bd675903e22638afe425fe9e38f426c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941) 
   * d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939264095


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 79df4a732b55a8e7a0179b337776e36f7df5c83e (Sat Oct 09 09:23:35 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 79df4a732b55a8e7a0179b337776e36f7df5c83e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906) 
   * e63def36ef5f877d6d3347ebfe0927e34c58087c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 79df4a732b55a8e7a0179b337776e36f7df5c83e Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906) 
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   * 178f04c20bd675903e22638afe425fe9e38f426c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "944241791",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "944897942",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25119",
       "triggerID" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25126",
       "triggerID" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 06a5f4448434e12912dbd35ac60332ea510e16ff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25126) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17443:
URL: https://github.com/apache/flink/pull/17443#discussion_r741964620



##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private long savepointTimeout;

Review comment:
       Since we are not storing the unit explicitly:
   
   ```suggestion
       private long savepointTimeoutMs;
   ```

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       ```suggestion
               new Option("st", "savepointTimeout", true, "The maximum completion time a savepoint is allowed to take before it is failed.");
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return path future where the savepoint is located
+     */
+    CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, long savepointTimeout);

Review comment:
       ```suggestion
               JobID jobId, @Nullable String savepointDirectory, long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {

Review comment:
       ```suggestion
               final boolean terminate, @Nullable final String targetLocation, long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
+        return triggerCheckpoint(props, externalSavepointLocation, 0, isPeriodic);
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation,
+            long savepointTimeout,

Review comment:
       ```suggestion
               long savepointTimeoutMs,
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);

Review comment:
       I am wondering whether it would be clearer if we passed `checkpointTimeout` instead of `0` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException getCheckpointException(
                 @Nullable String externalSavepointLocation,
                 boolean isPeriodic) {
 
+            this(props, externalSavepointLocation, 0, isPeriodic);
+        }
+
+        CheckpointTriggerRequest(
+                CheckpointProperties props,
+                @Nullable String externalSavepointLocation,
+                long savepointTimeout,
+                boolean isPeriodic) {
+
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
+            this.savepointTimeout = savepointTimeout;

Review comment:
       Either `Duration` or we should put the unit in the name.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Let's remove the old `triggerSavepoint` method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;

Review comment:
       Same here: `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       If we used for the `CheckpointCoordinator` a `Duration` type, then we wouldn't have to rely that users of this class provide the timeout in the correct unit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Alternatively we could rename the old method to `triggerSavepointWithDefaultTimeout` or so.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
     public String getJarFilePath() {
         return jarFile;
     }
+
+    public long getSavepointTimeout() {

Review comment:
       ```suggestion
       public long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return a {@link CompletableFuture} containing the path where the savepoint is located
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final long savepointTimeout);

Review comment:
       ```suggestion
               final long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       ```suggestion
               @Nullable final String targetLocation, final long savepointTimeoutMs) {
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId,
+                gateway ->
+                        gateway.stopWithSavepoint(
+                                targetDirectory, savepointTimeout, terminate, timeout));

Review comment:
       Same here for `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory Target directory for the savepoint.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    default CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }

Review comment:
       Can we get rid of this method?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }

Review comment:
       Can we get rid of this method?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory) {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, long savepointTimeout) {
+            return null;

Review comment:
       Let's return `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
         return targetDirectory;
     }
 
+    long getSavepointTimeout() {

Review comment:
       ```suggestion
       long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final long savepointTimeout;

Review comment:
       ```suggestion
       private final long savepointTimeoutMs;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {
+
+        final CheckpointProperties properties =
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+        return triggerSavepointInternal(properties, targetLocation, savepointTimeout);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            @Nullable final String targetLocation) {
+            @Nullable final String targetLocation,
+            long savepointTimeout) {

Review comment:
       ```suggestion
               long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {

Review comment:
       Can we replace the old `triggerSavepoint` method completely?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
             final boolean terminate,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            @RpcTimeout final Time timeout);

Review comment:
       And `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+        return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+
+        return schedulerNG.stopWithSavepoint(targetDirectory, savepointTimeout, terminate);

Review comment:
       Same here.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       What is the unit of the savepoint timeout? ms? Then I would suggest to make this explicit via `savepointTimeoutMs` and also mentioning the timeout in the description.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
 
-        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+        return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {
+        return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout, cancelJob);

Review comment:
       Can we replace the old `triggerSavepoint` method here? This should not be public API.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate) {

Review comment:
       If we have overloaded methods then I like to keep the parameter prefix always the same between the different variants. I think this is easier for people to use. So in this case, I would put `savepointTimeout` at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zoltar9264 commented on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
zoltar9264 commented on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-944241791






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 178f04c20bd675903e22638afe425fe9e38f426c Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941) 
   * d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "944241791",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "944897942",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25119",
       "triggerID" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25126",
       "triggerID" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f3254caaf76aff6934e0b0bd21502ea8830f5a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25119) 
   * 06a5f4448434e12912dbd35ac60332ea510e16ff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25126) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b26de7ec3bc0ce4d9660eb58891e98873d31d9f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25103",
       "triggerID" : "944241791",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a7caba5674c2cda3f6ef12a801e28b1b044a1619",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25118",
       "triggerID" : "944897942",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25119",
       "triggerID" : "e3f3254caaf76aff6934e0b0bd21502ea8830f5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "06a5f4448434e12912dbd35ac60332ea510e16ff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e3f3254caaf76aff6934e0b0bd21502ea8830f5a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25119) 
   * 06a5f4448434e12912dbd35ac60332ea510e16ff UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17443:
URL: https://github.com/apache/flink/pull/17443#discussion_r741964620



##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private long savepointTimeout;

Review comment:
       Since we are not storing the unit explicitly:
   
   ```suggestion
       private long savepointTimeoutMs;
   ```

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       ```suggestion
               new Option("st", "savepointTimeout", true, "The maximum completion time a savepoint is allowed to take before it is failed.");
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return path future where the savepoint is located
+     */
+    CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, long savepointTimeout);

Review comment:
       ```suggestion
               JobID jobId, @Nullable String savepointDirectory, long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {

Review comment:
       ```suggestion
               final boolean terminate, @Nullable final String targetLocation, long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
+        return triggerCheckpoint(props, externalSavepointLocation, 0, isPeriodic);
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation,
+            long savepointTimeout,

Review comment:
       ```suggestion
               long savepointTimeoutMs,
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);

Review comment:
       I am wondering whether it would be clearer if we passed `checkpointTimeout` instead of `0` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException getCheckpointException(
                 @Nullable String externalSavepointLocation,
                 boolean isPeriodic) {
 
+            this(props, externalSavepointLocation, 0, isPeriodic);
+        }
+
+        CheckpointTriggerRequest(
+                CheckpointProperties props,
+                @Nullable String externalSavepointLocation,
+                long savepointTimeout,
+                boolean isPeriodic) {
+
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
+            this.savepointTimeout = savepointTimeout;

Review comment:
       Either `Duration` or we should put the unit in the name.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Let's remove the old `triggerSavepoint` method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;

Review comment:
       Same here: `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       If we used for the `CheckpointCoordinator` a `Duration` type, then we wouldn't have to rely that users of this class provide the timeout in the correct unit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Alternatively we could rename the old method to `triggerSavepointWithDefaultTimeout` or so.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
     public String getJarFilePath() {
         return jarFile;
     }
+
+    public long getSavepointTimeout() {

Review comment:
       ```suggestion
       public long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return a {@link CompletableFuture} containing the path where the savepoint is located
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final long savepointTimeout);

Review comment:
       ```suggestion
               final long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       ```suggestion
               @Nullable final String targetLocation, final long savepointTimeoutMs) {
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId,
+                gateway ->
+                        gateway.stopWithSavepoint(
+                                targetDirectory, savepointTimeout, terminate, timeout));

Review comment:
       Same here for `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory Target directory for the savepoint.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    default CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }

Review comment:
       Can we get rid of this method?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }

Review comment:
       Can we get rid of this method?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory) {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, long savepointTimeout) {
+            return null;

Review comment:
       Let's return `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
         return targetDirectory;
     }
 
+    long getSavepointTimeout() {

Review comment:
       ```suggestion
       long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final long savepointTimeout;

Review comment:
       ```suggestion
       private final long savepointTimeoutMs;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {
+
+        final CheckpointProperties properties =
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+        return triggerSavepointInternal(properties, targetLocation, savepointTimeout);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            @Nullable final String targetLocation) {
+            @Nullable final String targetLocation,
+            long savepointTimeout) {

Review comment:
       ```suggestion
               long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {

Review comment:
       Can we replace the old `triggerSavepoint` method completely?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
             final boolean terminate,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            @RpcTimeout final Time timeout);

Review comment:
       And `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+        return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+
+        return schedulerNG.stopWithSavepoint(targetDirectory, savepointTimeout, terminate);

Review comment:
       Same here.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       What is the unit of the savepoint timeout? ms? Then I would suggest to make this explicit via `savepointTimeoutMs` and also mentioning the timeout in the description.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
 
-        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+        return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {
+        return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout, cancelJob);

Review comment:
       Can we replace the old `triggerSavepoint` method here? This should not be public API.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate) {

Review comment:
       If we have overloaded methods then I like to keep the parameter prefix always the same between the different variants. I think this is easier for people to use. So in this case, I would put `savepointTimeout` at the end.

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private long savepointTimeout;

Review comment:
       Since we are not storing the unit explicitly:
   
   ```suggestion
       private long savepointTimeoutMs;
   ```

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       ```suggestion
               new Option("st", "savepointTimeout", true, "The maximum completion time a savepoint is allowed to take before it is failed.");
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return path future where the savepoint is located
+     */
+    CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, long savepointTimeout);

Review comment:
       ```suggestion
               JobID jobId, @Nullable String savepointDirectory, long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {

Review comment:
       ```suggestion
               final boolean terminate, @Nullable final String targetLocation, long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
+        return triggerCheckpoint(props, externalSavepointLocation, 0, isPeriodic);
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation,
+            long savepointTimeout,

Review comment:
       ```suggestion
               long savepointTimeoutMs,
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);

Review comment:
       I am wondering whether it would be clearer if we passed `checkpointTimeout` instead of `0` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException getCheckpointException(
                 @Nullable String externalSavepointLocation,
                 boolean isPeriodic) {
 
+            this(props, externalSavepointLocation, 0, isPeriodic);
+        }
+
+        CheckpointTriggerRequest(
+                CheckpointProperties props,
+                @Nullable String externalSavepointLocation,
+                long savepointTimeout,
+                boolean isPeriodic) {
+
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
+            this.savepointTimeout = savepointTimeout;

Review comment:
       Either `Duration` or we should put the unit in the name.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Let's remove the old `triggerSavepoint` method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;

Review comment:
       Same here: `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       If we used for the `CheckpointCoordinator` a `Duration` type, then we wouldn't have to rely that users of this class provide the timeout in the correct unit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Alternatively we could rename the old method to `triggerSavepointWithDefaultTimeout` or so.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
     public String getJarFilePath() {
         return jarFile;
     }
+
+    public long getSavepointTimeout() {

Review comment:
       ```suggestion
       public long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return a {@link CompletableFuture} containing the path where the savepoint is located
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final long savepointTimeout);

Review comment:
       ```suggestion
               final long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       ```suggestion
               @Nullable final String targetLocation, final long savepointTimeoutMs) {
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId,
+                gateway ->
+                        gateway.stopWithSavepoint(
+                                targetDirectory, savepointTimeout, terminate, timeout));

Review comment:
       Same here for `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory Target directory for the savepoint.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    default CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }

Review comment:
       Can we get rid of this method?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }

Review comment:
       Can we get rid of this method?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory) {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, long savepointTimeout) {
+            return null;

Review comment:
       Let's return `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
         return targetDirectory;
     }
 
+    long getSavepointTimeout() {

Review comment:
       ```suggestion
       long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final long savepointTimeout;

Review comment:
       ```suggestion
       private final long savepointTimeoutMs;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {
+
+        final CheckpointProperties properties =
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+        return triggerSavepointInternal(properties, targetLocation, savepointTimeout);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            @Nullable final String targetLocation) {
+            @Nullable final String targetLocation,
+            long savepointTimeout) {

Review comment:
       ```suggestion
               long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {

Review comment:
       Can we replace the old `triggerSavepoint` method completely?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
             final boolean terminate,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            @RpcTimeout final Time timeout);

Review comment:
       And `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+        return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+
+        return schedulerNG.stopWithSavepoint(targetDirectory, savepointTimeout, terminate);

Review comment:
       Same here.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       What is the unit of the savepoint timeout? ms? Then I would suggest to make this explicit via `savepointTimeoutMs` and also mentioning the timeout in the description.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
 
-        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+        return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {
+        return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout, cancelJob);

Review comment:
       Can we replace the old `triggerSavepoint` method here? This should not be public API.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate) {

Review comment:
       If we have overloaded methods then I like to keep the parameter prefix always the same between the different variants. I think this is easier for people to use. So in this case, I would put `savepointTimeout` at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #17443:
URL: https://github.com/apache/flink/pull/17443#discussion_r741964620



##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
     private boolean dispose;
     private String disposeSavepointPath;
     private String jarFile;
+    private long savepointTimeout;

Review comment:
       Since we are not storing the unit explicitly:
   
   ```suggestion
       private long savepointTimeoutMs;
   ```

##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       ```suggestion
               new Option("st", "savepointTimeout", true, "The maximum completion time a savepoint is allowed to take before it is failed.");
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
 We have to use `--savepointPath` to specify the savepoint folder if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return path future where the savepoint is located
+     */
+    CompletableFuture<String> triggerSavepoint(
+            JobID jobId, @Nullable String savepointDirectory, long savepointTimeout);

Review comment:
       ```suggestion
               JobID jobId, @Nullable String savepointDirectory, long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {

Review comment:
       ```suggestion
               final boolean terminate, @Nullable final String targetLocation, long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
+        return triggerCheckpoint(props, externalSavepointLocation, 0, isPeriodic);
+    }
+
+    @VisibleForTesting
+    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+            CheckpointProperties props,
+            @Nullable String externalSavepointLocation,
+            long savepointTimeout,

Review comment:
       ```suggestion
               long savepointTimeoutMs,
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);

Review comment:
       I am wondering whether it would be clearer if we passed `checkpointTimeout` instead of `0` here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException getCheckpointException(
                 @Nullable String externalSavepointLocation,
                 boolean isPeriodic) {
 
+            this(props, externalSavepointLocation, 0, isPeriodic);
+        }
+
+        CheckpointTriggerRequest(
+                CheckpointProperties props,
+                @Nullable String externalSavepointLocation,
+                long savepointTimeout,
+                boolean isPeriodic) {
+
             this.timestamp = System.currentTimeMillis();
             this.props = checkNotNull(props);
             this.externalSavepointLocation = externalSavepointLocation;
+            this.savepointTimeout = savepointTimeout;

Review comment:
       Either `Duration` or we should put the unit in the name.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Let's remove the old `triggerSavepoint` method.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;

Review comment:
       Same here: `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       If we used for the `CheckpointCoordinator` a `Duration` type, then we wouldn't have to rely that users of this class provide the timeout in the correct unit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
             final boolean cancelJob,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Triggers taking a savepoint of the executed job.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> triggerSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean cancelJob,
+            @RpcTimeout final Time timeout);

Review comment:
       Alternatively we could rename the old method to `triggerSavepointWithDefaultTimeout` or so.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
     public String getJarFilePath() {
         return jarFile;
     }
+
+    public long getSavepointTimeout() {

Review comment:
       ```suggestion
       public long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return a {@link CompletableFuture} containing the path where the savepoint is located
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final long savepointTimeout);

Review comment:
       ```suggestion
               final long savepointTimeoutMs);
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
             @Nullable final String targetLocation) {
         final CheckpointProperties properties =
                 CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+            @Nullable final String targetLocation, final long savepointTimeout) {

Review comment:
       ```suggestion
               @Nullable final String targetLocation, final long savepointTimeoutMs) {
   ```

##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the run command.
 The savepoint folder is optional and needs to be specified if 
 [state.savepoints.dir]({{< ref "docs/deployment/config" >}}#state-savepoints-dir) isn't set.
 
+The savepoint timeout is optional and checkpoint timeout will take effect if isn't set.

Review comment:
       ```suggestion
   The savepoint timeout `--savepointTimeout` is optional and checkpoint timeout will take effect if isn't set.
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
             final boolean advanceToEndOfEventTime,
             @Nullable final String savepointDirectory);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in this client's
+     * configuration. Stopping works only for streaming programs. Be aware, that the program might
+     * continue to run for a while after sending the stop command, because after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+        return performOperationOnJobMasterGateway(
+                jobId,
+                gateway ->
+                        gateway.stopWithSavepoint(
+                                targetDirectory, savepointTimeout, terminate, timeout));

Review comment:
       Same here for `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }
 
+    /**
+     * Triggers a savepoint with the given savepoint directory as a target.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory Target directory for the savepoint.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param timeout Timeout for the asynchronous operation
+     * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of
+     *     the savepoint.
+     */
+    default CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            @RpcTimeout Time timeout) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param jobId ID of the job for which the savepoint should be triggered.
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    default CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final String targetDirectory,
+            final boolean terminate,
+            @RpcTimeout final Time timeout) {
+        throw new UnsupportedOperationException();
+    }

Review comment:
       Can we get rid of this method?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
         throw new UnsupportedOperationException();
     }

Review comment:
       Can we get rid of this method?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                long savepointTimeout) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory) {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, long savepointTimeout) {
+            return null;

Review comment:
       Let's return `FutureUtils.unsupportedOperationFuture()`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
         return targetDirectory;
     }
 
+    long getSavepointTimeout() {

Review comment:
       ```suggestion
       long getSavepointTimeoutMs() {
   ```

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final long savepointTimeout;

Review comment:
       ```suggestion
       private final long savepointTimeoutMs;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit is missing.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
      */
     CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory);
 
+    /**
+     * Triggers a savepoint for the job identified by the job id. The savepoint will be written to
+     * the given savepoint directory, or {@link
+     * org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it is null.
+     *
+     * @param jobId job id
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take

Review comment:
       Unit description is missing.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
         final CheckpointProperties properties =
                 CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
 
-        return triggerSavepointInternal(properties, targetLocation);
+        return triggerSavepointInternal(properties, targetLocation, 0);
+    }
+
+    /**
+     * Triggers a synchronous savepoint with the given savepoint directory as a target.
+     *
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param targetLocation Target location for the savepoint, optional. If null, the state
+     *     backend's configured default will be used.
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @return A future to the completed checkpoint
+     * @throws IllegalStateException If no savepoint directory has been specified and no default
+     *     savepoint directory has been configured
+     */
+    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+            final boolean terminate, @Nullable final String targetLocation, long savepointTimeout) {
+
+        final CheckpointProperties properties =
+                CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+        return triggerSavepointInternal(properties, targetLocation, savepointTimeout);
     }
 
     private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
             final CheckpointProperties checkpointProperties,
-            @Nullable final String targetLocation) {
+            @Nullable final String targetLocation,
+            long savepointTimeout) {

Review comment:
       ```suggestion
               long savepointTimeoutMs) {
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTi
                 jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
     }
 
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {

Review comment:
       Can we replace the old `triggerSavepoint` method completely?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
             final boolean terminate,
             @RpcTimeout final Time timeout);
 
+    /**
+     * Stops the job with a savepoint.
+     *
+     * @param targetDirectory to which to write the savepoint data or null if the default savepoint
+     *     directory should be used
+     * @param savepointTimeout Timeout for the savepoint. If it <= 0, checkpoint timeout will take
+     *     effect.
+     * @param terminate flag indicating if the job should terminate or just suspend
+     * @param timeout for the rpc call
+     * @return Future which is completed with the savepoint path once completed
+     */
+    CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            @RpcTimeout final Time timeout);

Review comment:
       And `stopWithSavepoint`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> stopWithSavepoint(
             @Nullable final String targetDirectory, final boolean terminate, final Time timeout) {
 
-        return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+        return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate,
+            final Time timeout) {
+
+        return schedulerNG.stopWithSavepoint(targetDirectory, savepointTimeout, terminate);

Review comment:
       Same here.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
+    static final Option SAVEPOINT_TIMEOUT_OPTION =
+            new Option("st", "savepointTimeout", true, "timeout of savepoint.");

Review comment:
       What is the unit of the savepoint timeout? ms? Then I would suggest to make this explicit via `savepointTimeoutMs` and also mentioning the timeout in the description.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerI
     public CompletableFuture<String> triggerSavepoint(
             @Nullable final String targetDirectory, final boolean cancelJob, final Time timeout) {
 
-        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+        return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory,
+            long savepointTimeout,
+            boolean cancelJob,
+            Time timeout) {
+        return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout, cancelJob);

Review comment:
       Can we replace the old `triggerSavepoint` method here? This should not be public API.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
         executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
+        return stopWithSavepoint(targetDirectory, 0, terminate);
+    }
+
     @Override
     public CompletableFuture<String> stopWithSavepoint(
-            @Nullable final String targetDirectory, final boolean terminate) {
+            @Nullable final String targetDirectory,
+            final long savepointTimeout,
+            final boolean terminate) {

Review comment:
       If we have overloaded methods then I like to keep the parameter prefix always the same between the different variants. I think this is easier for people to use. So in this case, I would put `savepointTimeout` at the end.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24941",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948",
       "triggerID" : "d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d7825ee175e5228e6cdbb4f50f2a3b9ab5383d09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24948) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 79df4a732b55a8e7a0179b337776e36f7df5c83e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 79df4a732b55a8e7a0179b337776e36f7df5c83e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17443: [FLINK-9465][Runtime/Checkpointing] Specify a separate savepoint timeout option via CLI and REST API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17443:
URL: https://github.com/apache/flink/pull/17443#issuecomment-939267795


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24906",
       "triggerID" : "79df4a732b55a8e7a0179b337776e36f7df5c83e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917",
       "triggerID" : "e63def36ef5f877d6d3347ebfe0927e34c58087c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "178f04c20bd675903e22638afe425fe9e38f426c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e63def36ef5f877d6d3347ebfe0927e34c58087c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=24917) 
   * 178f04c20bd675903e22638afe425fe9e38f426c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org