You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/22 10:24:25 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #236: [hotfix] Fix last checkpoint observe with empty history

gyfora opened a new pull request, #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236

   Fixes a very important cornercase where latest checkpoint wasn't observed correctly after job restore recovery.
   
   The current logic relies on being able to observe the latest checkpoint/savepoint correctly in case of a terminal job state and setting it in the status.
   
   Previously this was done completely based on the checkpoint history, without accounting for the possibility that the history may be empty if the job fails after restore before the first checkpoint.
   
   At the moment this bug would cause jobs to fall back to an invalid earlier checkpoint in some cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on a diff in pull request #236: [hotfix] Fix last checkpoint observe with empty history

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on code in PR #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236#discussion_r878864746


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -633,8 +623,26 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) th
                                                                             .FIELD_NAME_EXTERNAL_PATH)
                                                             .asText()));
 
+            if (latestCheckpointOpt.isEmpty()) {
+                // Use restore checkpoint instead
+                LOG.info(
+                        "Could not find any checkpoints in the history, returning restore checkpoint if present");
+                latestCheckpointOpt = checkpoints.getRestorePath().map(Savepoint::of);
+            }
+
             if (!latestCheckpointOpt.isPresent()) {
-                LOG.warn("Could not find any externally addressable checkpoints.");
+                LOG.warn("Could not find any checkpoints.");
+            }
+
+            if (latestCheckpointOpt.isPresent()
+                    && latestCheckpointOpt
+                            .get()
+                            .equals(

Review Comment:
   Comparing `Savepoint` with a `String`.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java:
##########
@@ -19,24 +19,43 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_HISTORY;
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS;
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH;
 
 /** Custom Response for handling checkpoint history in a multi-version compatible way. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @Data
 @NoArgsConstructor
 public class CheckpointHistoryWrapper implements ResponseBody {
 
-    public static final String FIELD_NAME_HISTORY = "history";
-
     @JsonProperty(FIELD_NAME_HISTORY)
     private List<ObjectNode> history;
+
+    @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)

Review Comment:
   I am not sure whether we could always use the field `latest` instead of `history`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #236: [hotfix] Fix last checkpoint observe with empty history

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #236: [hotfix] Fix last checkpoint observe with empty history

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236#discussion_r878938669


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/CheckpointHistoryWrapper.java:
##########
@@ -19,24 +19,43 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_HISTORY;
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS;
+import static org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH;
 
 /** Custom Response for handling checkpoint history in a multi-version compatible way. */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @Data
 @NoArgsConstructor
 public class CheckpointHistoryWrapper implements ResponseBody {
 
-    public static final String FIELD_NAME_HISTORY = "history";
-
     @JsonProperty(FIELD_NAME_HISTORY)
     private List<ObjectNode> history;
+
+    @JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)

Review Comment:
   We could but we still need to check whether latest savepoint or latest checkpoint is newer in that case 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #236: [hotfix] Fix last checkpoint observe with empty history

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236#discussion_r878938574


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -633,8 +623,26 @@ public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) th
                                                                             .FIELD_NAME_EXTERNAL_PATH)
                                                             .asText()));
 
+            if (latestCheckpointOpt.isEmpty()) {
+                // Use restore checkpoint instead
+                LOG.info(
+                        "Could not find any checkpoints in the history, returning restore checkpoint if present");
+                latestCheckpointOpt = checkpoints.getRestorePath().map(Savepoint::of);
+            }
+
             if (!latestCheckpointOpt.isPresent()) {
-                LOG.warn("Could not find any externally addressable checkpoints.");
+                LOG.warn("Could not find any checkpoints.");
+            }
+
+            if (latestCheckpointOpt.isPresent()
+                    && latestCheckpointOpt
+                            .get()
+                            .equals(

Review Comment:
   Good catch I was planning to add a test for this anyways :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #236: [hotfix] Fix last checkpoint observe with empty history

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #236:
URL: https://github.com/apache/flink-kubernetes-operator/pull/236#issuecomment-1133864932

   @wangyang0918 @Aitozi @morhidi this is a critical fix that should be reviewd & merged before the first RC


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org