You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/05 21:53:55 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #255: [FLINK-27257] Retry failed savepoints within grace period

gyfora opened a new pull request, #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255

   This PR changes the savepoint triggering / observe flow by only updating the lastReconciledSpec.savepointTriggerNonce after the manual savepoint was successfully completed.
   
   This change allows the reconciler to seamlessly retry the savepoints within the grace period without adding new complicated retry logic.
   
   Other changes:
   
    - Rename the grace period config to kubernetes.operator.savepoint.trigger.grace-period to more closely reflect the functionality and make it per-job configurable
    - Some other related cleanups and minor refactorings in the SavepointObserver


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890269122


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
+
+        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {
-            if (SavepointUtils.gracePeriodEnded(
-                    configManager.getOperatorConfiguration(), currentSavepointInfo)) {
-                String errorMsg =
-                        "Savepoint operation timed out after "
-                                + configManager
-                                        .getOperatorConfiguration()
-                                        .getSavepointTriggerGracePeriod();
-                currentSavepointInfo.resetTrigger();
-                LOG.error(errorMsg);
-                return Optional.of(errorMsg);
-            } else {
-                LOG.info("Savepoint operation not finished yet, waiting within grace period...");
-                return Optional.empty();
-            }
+            LOG.info("Savepoint operation not finished yet...");
+            return;
         }
 
         if (savepointFetchResult.getError() != null) {
-            currentSavepointInfo.resetTrigger();
-            return Optional.of(savepointFetchResult.getError());
+            var err = savepointFetchResult.getError();
+            if (SavepointUtils.gracePeriodEnded(deployedConfig, savepointInfo)) {
+                LOG.error(
+                        "Savepoint attempt failed after grace period. Won't be retried again: "
+                                + err);
+                ReconciliationUtils.updateLastReconciledSavepointTrigger(savepointInfo, resource);

Review Comment:
   The function name `updateLastReconciledSavepointTrigger ` is a bit confused. It looks like a semantic of `triggerNonceFinished` (no matter success or failed). And it do not have to touch the common status's error I think 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#issuecomment-1147637009

   @Aitozi updated the PR according to your comments :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890306984


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
+
+        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {

Review Comment:
   This is an intentional change. Flink already has a checkpoint/savepoint timeout setting that governs the savepointing time. Grace period now only affects triggering.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890305515


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);

Review Comment:
   No that's not necessary, the point is that it is executed before any reconciliation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890648242


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
+
+        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {

Review Comment:
   Get your point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890209182


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
                             "The interval for observing status for in-progress operations such as deployment and savepoints.");
 
     public static final ConfigOption<Duration> OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =

Review Comment:
   The variable name could also be refactor accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#issuecomment-1147262306

   @morhidi @Aitozi @SteNicholas 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890243163


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
+
+        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {

Review Comment:
   If it is in pending, it will not check the grace period now, So the savepoint may take a longer time than the grace period time now 



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.

Review Comment:
   no return value now



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);

Review Comment:
   do we have to put this in a finally block to ensure this will always be done ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
                             "The interval for observing status for in-progress operations such as deployment and savepoints.");
 
     public static final ConfigOption<Duration> OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =

Review Comment:
   The variable name could also be refactor according.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890310614


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -69,96 +69,78 @@ public void observeSavepointStatus(
                         .map(Savepoint::getLocation)
                         .orElse(null);
 
-        observeTriggeredSavepointProgress(savepointInfo, jobId, deployedConfig)
-                .ifPresent(
-                        err ->
-                                EventUtils.createOrUpdateEvent(
-                                        flinkService.getKubernetesClient(),
-                                        resource,
-                                        EventUtils.Type.Warning,
-                                        "SavepointError",
-                                        SavepointUtils.createSavepointError(
-                                                savepointInfo,
-                                                resource.getSpec()
-                                                        .getJob()
-                                                        .getSavepointTriggerNonce()),
-                                        EventUtils.Component.Operator));
-
-        // We only need to observe latest checkpoint/savepoint for terminal jobs
-        if (JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
-            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
+        // If any manual or periodic savepoint is in progress, observe it
+        if (SavepointUtils.savepointInProgress(jobStatus)) {
+            observeTriggeredSavepoint(resource, jobId, deployedConfig);
         }
 
-        var currentLastSpPath =
-                Optional.ofNullable(savepointInfo.getLastSavepoint())
-                        .map(Savepoint::getLocation)
-                        .orElse(null);
-
-        // If the last savepoint information changes we need to patch the status
-        // to avoid losing this in case of an operator failure after the cluster was shut down
-        if (currentLastSpPath != null && !currentLastSpPath.equals(previousLastSpPath)) {
-            LOG.info(
-                    "Updating resource status after observing new last savepoint {}",
-                    currentLastSpPath);
-            statusHelper.patchAndCacheStatus(resource);
+        // If job is in globally terminal state, observe last savepoint
+        if (ReconciliationUtils.isJobInTerminalState(resource.getStatus())) {
+            observeLatestSavepoint(savepointInfo, jobId, deployedConfig);
         }
+
+        patchStatusOnSavepointChange(resource, savepointInfo, previousLastSpPath);
     }
 
     /**
      * Observe the savepoint result based on the current savepoint info.
      *
-     * @param currentSavepointInfo the current savepoint info.
+     * @param resource the resource being observed
      * @param jobID the jobID of the observed job.
      * @param deployedConfig Deployed job config.
      * @return The observed error, if no error observed, {@code Optional.empty()} will be returned.
      */
-    private Optional<String> observeTriggeredSavepointProgress(
-            SavepointInfo currentSavepointInfo, String jobID, Configuration deployedConfig) {
-        if (StringUtils.isEmpty(currentSavepointInfo.getTriggerId())) {
-            LOG.debug("Savepoint not in progress");
-            return Optional.empty();
-        }
+    private void observeTriggeredSavepoint(
+            AbstractFlinkResource<?, ?> resource, String jobID, Configuration deployedConfig) {
+
+        var savepointInfo = resource.getStatus().getJobStatus().getSavepointInfo();
+
         LOG.info("Observing savepoint status.");
-        SavepointFetchResult savepointFetchResult =
+        var savepointFetchResult =
                 flinkService.fetchSavepointInfo(
-                        currentSavepointInfo.getTriggerId(), jobID, deployedConfig);
+                        savepointInfo.getTriggerId(), jobID, deployedConfig);
 
         if (savepointFetchResult.isPending()) {
-            if (SavepointUtils.gracePeriodEnded(
-                    configManager.getOperatorConfiguration(), currentSavepointInfo)) {
-                String errorMsg =
-                        "Savepoint operation timed out after "
-                                + configManager
-                                        .getOperatorConfiguration()
-                                        .getSavepointTriggerGracePeriod();
-                currentSavepointInfo.resetTrigger();
-                LOG.error(errorMsg);
-                return Optional.of(errorMsg);
-            } else {
-                LOG.info("Savepoint operation not finished yet, waiting within grace period...");
-                return Optional.empty();
-            }
+            LOG.info("Savepoint operation not finished yet...");
+            return;
         }
 
         if (savepointFetchResult.getError() != null) {
-            currentSavepointInfo.resetTrigger();
-            return Optional.of(savepointFetchResult.getError());
+            var err = savepointFetchResult.getError();
+            if (SavepointUtils.gracePeriodEnded(deployedConfig, savepointInfo)) {
+                LOG.error(
+                        "Savepoint attempt failed after grace period. Won't be retried again: "
+                                + err);
+                ReconciliationUtils.updateLastReconciledSavepointTrigger(savepointInfo, resource);

Review Comment:
   Good point



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #255: [FLINK-27257] Retry failed savepoints within grace period

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #255:
URL: https://github.com/apache/flink-kubernetes-operator/pull/255#discussion_r890307980


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -58,9 +58,11 @@ public class KubernetesOperatorConfigOptions {
                             "The interval for observing status for in-progress operations such as deployment and savepoints.");
 
     public static final ConfigOption<Duration> OPERATOR_OBSERVER_SAVEPOINT_TRIGGER_GRACE_PERIOD =

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org