You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 16:13:46 UTC

[GitHub] [flink] dawidwys opened a new pull request #18482: [FLINK-25744] Support native savepoints

dawidwys opened a new pull request #18482:
URL: https://github.com/apache/flink/pull/18482


   ## What is the purpose of the change
   
   We introduce a savepoint type flag both to the CLI and REST API that
   controls the binary format of the savepoint to take. Native savepoints
   are taken in the state backend specific binary format. They can be
   faster to take and restore from, but they do not support e.g. changing
   the state backend.
   
   This commit does not support RocksDB incremental savepoints yet. When
   incremental savepoints are configured, native savepoints will fail.
   
   
   ## Brief change log
   
   See commit log
   
   ## Verifying this change
   
   Added tests in `SavepointFormatITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020275759


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 91675405ae82d0e945f46caaa685d65763b296c3 (Mon Jan 24 16:18:17 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793602159



##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       I am talking about this:
   ```
   if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
               formatType =
                       ConfigurationUtils.convertValue(
                               line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
                               SavepointFormatType.class);
           } else {
               formatType = SavepointFormatType.DEFAULT;
   }
   ```
   I actually expect here something like this:
   ```
   formatType = SavepointFormatType.fromString(line.getOptionValue(SAVEPOINT_FORMAT_OPTION));
   ```
   Another proposal to create `convertValue` method with default:
   ```
   ConfigurationUtils.convertValue(
                               line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
                               SavepointFormatType.class, SavepointFormatType.DEFAULT);
   ```
   But in general, I understand your point that it is a kind of different representation level. But anyway I would expect the method something like `getOrDefault(line, SAVEPOINT_FORMAT_OPTION)`. In fact, it is not critical for me, if you think that any changes don't make any sense we can leave it as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793575880



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -498,13 +501,6 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
-        if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
-                && !(props.isSynchronous() && props.isSavepoint())) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalArgumentException(
-                            "Only synchronous savepoints are allowed to advance the watermark to MAX."));
-        }

Review comment:
       I find those checks at this locaiton pointless. Valid combinations of properties of a `SnapshotType` are limited by factory methods of `CheckpointType` & `SavepointType`. I see no reason for checking it again here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys merged pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys merged pull request #18482:
URL: https://github.com/apache/flink/pull/18482


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793581546



##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       Which ifs? We are not converting from a string here, but from a `CommandLine`. For that reason I don't think `SavepointFormatType` is the correct place, but I'll give it a second thought where in the cli package we could put that code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070) 
   * cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1025562743


   Hey, @akalash I added a single commit for documentation of the savepoint format. Could you take a look at #cdc389d as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793828154



##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       I think you can leave it as is now. I still feel that it is not great since I see the pretty classic pattern `deserialize-if-not-null-or-default` but I am not ready to say what will be better since we already have several layers String->ConfigOption->String->Object(or something similar) and I don't want to make it more difficult.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793538049



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -498,13 +501,6 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
-        if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
-                && !(props.isSynchronous() && props.isSavepoint())) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalArgumentException(
-                            "Only synchronous savepoints are allowed to advance the watermark to MAX."));
-        }

Review comment:
       I don't really get why it was removed?

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -867,9 +867,9 @@ public int numKeyValueStateEntries() {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointType) {
+    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
         return priorityQueueFactory instanceof HeapPriorityQueueSetFactory
-                && checkpointType == CheckpointType.CHECKPOINT;
+                && !checkpointType.isSavepoint();

Review comment:
       Before changes it was only `CheckpointType.CHECKPOINT`, after changes it is `CheckpointType.CHECKPOINT` and `CheckpointType.FULL_CHECKPOINT`. Is it ok?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
##########
@@ -55,9 +56,17 @@ public void testSavepoint() throws Exception {
         final byte[] locationBytes = new byte[rnd.nextInt(41) + 1];
         rnd.nextBytes(locationBytes);
 
+        final SnapshotType[] snapshotTypes = {
+            CHECKPOINT,
+            FULL_CHECKPOINT,
+            SavepointType.savepoint(SavepointFormatType.CANONICAL),
+            SavepointType.suspend(SavepointFormatType.CANONICAL),
+            SavepointType.terminate(SavepointFormatType.CANONICAL)
+        };

Review comment:
       Since it is not enum anymore there is highly likely to forget about this place when a new snapshot type will be added. Is it possible to avoid such a problem?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##########
@@ -269,9 +273,14 @@ void snapshotState(
         }
     }
 
+    private boolean isCanonicalSavepoint(SnapshotType snapshotType) {

Review comment:
       I just share the concern that using an explicit cast to SavepointType here doesn't look so good. But I see that it is problem with current behavior in general, not with this PR.

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       Maybe is it better to do these `if-else` things inside `SavepointFormatType` and here just call `SavepointFormatType.fromString()`?

##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -37,6 +42,14 @@ public SavepointOptions(CommandLine line) {
         dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
         disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
         jarFile = line.getOptionValue(JAR_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       The same comment about moving it inside `SavepointFormatType`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793575880



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -498,13 +501,6 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
-        if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE
-                && !(props.isSynchronous() && props.isSavepoint())) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalArgumentException(
-                            "Only synchronous savepoints are allowed to advance the watermark to MAX."));
-        }

Review comment:
       I find those checks at this locaiton pointless. Valid combinations of properties of a `SnapshotType` are limited by factory methods of `CheckpointType` & `SavepointType`. I see no reason for checking it again here.
   
   Moreover a call to `props.isSynchronous()` was even more pointless as internally it was checking `postCheckpointAction != PostCheckpointAction.NONE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793579611



##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -867,9 +867,9 @@ public int numKeyValueStateEntries() {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType checkpointType) {
+    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType checkpointType) {
         return priorityQueueFactory instanceof HeapPriorityQueueSetFactory
-                && checkpointType == CheckpointType.CHECKPOINT;
+                && !checkpointType.isSavepoint();

Review comment:
       Yes, actually it was a bug before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30504",
       "triggerID" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070) 
   * cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30504) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793698672



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
##########
@@ -55,9 +56,17 @@ public void testSavepoint() throws Exception {
         final byte[] locationBytes = new byte[rnd.nextInt(41) + 1];
         rnd.nextBytes(locationBytes);
 
+        final SnapshotType[] snapshotTypes = {
+            CHECKPOINT,
+            FULL_CHECKPOINT,
+            SavepointType.savepoint(SavepointFormatType.CANONICAL),
+            SavepointType.suspend(SavepointFormatType.CANONICAL),
+            SavepointType.terminate(SavepointFormatType.CANONICAL)
+        };

Review comment:
       Hmm... good point. So far I could not figure out a solution. However I am wondering how important it is we cover all different configurations. In the end we are pretty much testing that java serialization works well here. Is that crucial that we test all possible values?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 91675405ae82d0e945f46caaa685d65763b296c3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793583263



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##########
@@ -269,9 +273,14 @@ void snapshotState(
         }
     }
 
+    private boolean isCanonicalSavepoint(SnapshotType snapshotType) {

Review comment:
       Yes, I also did not like the casting, therefore I am more than happy to hear alternatives.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793674168



##########
File path: flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       I could do:
   ```
           this.formatType =
                   ConfigurationUtils.convertValue(
                           line.getOptionValue(
                                   SAVEPOINT_FORMAT_OPTION, SavepointFormatType.DEFAULT.toString()),
                           SavepointFormatType.class);
   ```
   
   Do you think that would be better? I would not like to extend the `convertValue` to handle default values or treat `null` specially. The default value for `ConfigOption(s)` is handled on a higher level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18482:
URL: https://github.com/apache/flink/pull/18482#issuecomment-1020278176


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30070",
       "triggerID" : "91675405ae82d0e945f46caaa685d65763b296c3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30504",
       "triggerID" : "cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cdc389d9c5827d45c922a10dc63b7c1ef3b6df3f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=30504) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] akalash commented on a change in pull request #18482: [FLINK-25744] Support native savepoints

Posted by GitBox <gi...@apache.org>.
akalash commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793811815



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
##########
@@ -55,9 +56,17 @@ public void testSavepoint() throws Exception {
         final byte[] locationBytes = new byte[rnd.nextInt(41) + 1];
         rnd.nextBytes(locationBytes);
 
+        final SnapshotType[] snapshotTypes = {
+            CHECKPOINT,
+            FULL_CHECKPOINT,
+            SavepointType.savepoint(SavepointFormatType.CANONICAL),
+            SavepointType.suspend(SavepointFormatType.CANONICAL),
+            SavepointType.terminate(SavepointFormatType.CANONICAL)
+        };

Review comment:
       I would say it will be nice to test everything but of course, it is not so crucial. Perhaps, we should test the serialization of SavepointType separately using the constructor so if somebody adds a new parameter they will be forced to change the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org