You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/01 00:05:11 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Aitozi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886202321


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Interval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed an will be done as part of the regular reschedule loop.</td>

Review Comment:
   I can't quite understand `is not guaranteed an will be done` this sentence, is this a typo ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,146 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                .deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(Long.MAX_VALUE);
+
+        var lastPeriodicTriggerTs =
+                jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
+
+        // When the resource is first created/periodic savepointing enabled we have to compare
+        // against the creation timestamp for triggering the first periodic savepoint
+        if (lastPeriodicTriggerTs.equals(0L)) {
+            lastPeriodicTriggerTs =
+                    Instant.parse(resource.getMetadata().getCreationTimestamp()).toEpochMilli();
+        }
+
+        long timeElapsed = System.currentTimeMillis() - lastPeriodicTriggerTs;
+        if (timeElapsed >= savepointInterval) {
+            LOG.info(
+                    "Triggering new periodic savepoint after {} seconds",
+                    Duration.ofMillis(timeElapsed).toSeconds());
+            return Optional.of(SavepointTriggerType.PERIODIC);
+        }
+        return Optional.empty();
     }
 
     public static boolean gracePeriodEnded(
             FlinkOperatorConfiguration configuration, SavepointInfo savepointInfo) {
-        Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
-        long triggerTimestamp = savepointInfo.getTriggerTimestamp();
-        return (System.currentTimeMillis() - triggerTimestamp) > gracePeriod.toMillis();
+        var elapsed = System.currentTimeMillis() - savepointInfo.getTriggerTimestamp();
+        return elapsed > configuration.getSavepointTriggerGracePeriod().toMillis();
+    }
+
+    public static void resetTriggerIfJobNotRunning(
+            KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+        var status = resource.getStatus();
+        var jobStatus = status.getJobStatus();
+        if (!ReconciliationUtils.isJobRunning(status)
+                && SavepointUtils.savepointInProgress(jobStatus)) {
+            jobStatus.getSavepointInfo().resetTrigger();
+            LOG.error("Job is not running, cancelling savepoint operation");
+            EventUtils.createOrUpdateEvent(
+                    client,
+                    resource,
+                    EventUtils.Type.Warning,
+                    "SavepointError",
+                    "Savepoint failed for savepointTriggerNonce: "
+                            + resource.getSpec().getJob().getSavepointTriggerNonce(),

Review Comment:
   nit: Since the savepoint can be triggered periodically or triggerNonce, this error message maybe inaccurate ? Maybe we could improve it by the type in savepointInfo



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -197,6 +197,12 @@ private Optional<String> validateJobSpec(JobSpec job, Map<String, String> confMa
                         String.format(
                                 "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
                                 CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            } else if (configuration.contains(
+                    KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
+                return Optional.of(

Review Comment:
   nit: The error msg of these three check is a bit redundant 😄



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -640,8 +642,8 @@ public SavepointFetchResult fetchSavepointInfo(
                 }
             }
             Savepoint savepoint =
-                    new Savepoint(
-                            System.currentTimeMillis(), response.get().resource().getLocation());
+                    Savepoint.of(
+                            response.get().resource().getLocation(), SavepointTriggerType.UNKNOWN);

Review Comment:
   I this the `UNKNOWN` type is only used for the zero value in the `SavepointInfo`. We could use the correct SavepointTriggerType here directly , do not have to reset it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org