You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/06 16:05:59 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

gyfora commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890305515


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);

Review Comment:
   No that's not necessary, the point is that it is executed before any reconciliation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org