You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/31 06:27:41 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #249: [FLINK-26179] Support for periodic savepoints

gyfora opened a new pull request, #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249

   This PR contains a few key improvements for savepoint triggering / management:
   
    1. Unify and simplify savepoint triggering between application and session jobs
    2. Simplify savepoint info update logic
    3. Add trigger type information to savepoints (we can also add new features to the cleanup logic based on this as a next step)
    4. Introduce periodic savepoint triggering
    5. Improve test coverage


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885742939


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   Users can choose to set it globally or override on a per job level, it works both ways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1141716343

   cc @Aitozi @morhidi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886324137


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -197,6 +197,12 @@ private Optional<String> validateJobSpec(JobSpec job, Map<String, String> confMa
                         String.format(
                                 "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
                                 CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            } else if (configuration.contains(
+                    KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
+                return Optional.of(

Review Comment:
   They are not that bad :) at least it gives a clear error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885753975


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   Do we want to 'enforce' savepoint creation when this is set globally? Can users turn it of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Sushant20 commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
Sushant20 commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1143124527

   @gyfora Related question: If periodic savepoint is enabled, how do we restore the job with specific savepoint path as part of native integration operator? There is something similar available on other operator page: https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/savepoints_guide.md#starting-a-job-from-a-savepoint


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886216711


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -640,8 +642,8 @@ public SavepointFetchResult fetchSavepointInfo(
                 }
             }
             Savepoint savepoint =
-                    new Savepoint(
-                            System.currentTimeMillis(), response.get().resource().getLocation());
+                    Savepoint.of(
+                            response.get().resource().getLocation(), SavepointTriggerType.UNKNOWN);

Review Comment:
   I think the `UNKNOWN` type is only used for the zero value in the `SavepointInfo`. We could use the correct SavepointTriggerType here directly , do not have to reset it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886370685


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointTriggerType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;
+
+/** Savepoint trigger mechanism. */
+public enum SavepointTriggerType {
+    /** Savepoint manually triggered by changing the savepointTriggerNonce. */
+    MANUAL,
+    /** Savepoint periodically triggered by the operator. */
+    PERIODIC,
+    /** Savepoint triggered during stateful upgrade. */
+    UPGRADE,
+    /** Savepoint trigger mechanism unknown. */
+    UNKNOWN

Review Comment:
   @gyfora, are the above cases above mentioned need to add in comments or document to illustrate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886327997


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -640,8 +642,8 @@ public SavepointFetchResult fetchSavepointInfo(
                 }
             }
             Savepoint savepoint =
-                    new Savepoint(
-                            System.currentTimeMillis(), response.get().resource().getLocation());
+                    Savepoint.of(
+                            response.get().resource().getLocation(), SavepointTriggerType.UNKNOWN);

Review Comment:
   I will actually change the fetch method to return only the location which is the only thing it actually gets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1143141982

   @Aitozi I fixed your comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1142279072

   Hitting this locally after a while, doesn't seem to be related to the current change```2022-05-31 17:07:55,479 o.a.f.k.o.o.SavepointObserver  [INFO ] [default.basic-checkpoint-ha-example] Disposing savepoint Savepoint(timeStamp=1654009519162, location=file:/flink-data/savepoints/savepoint-000000-930e88f68777, triggerType=PERIODIC)
   2022-05-31 17:07:55,480 o.a.f.s.n.i.n.c.AbstractChannel [WARN ] [.] Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x2cc71569]
   java.util.concurrent.RejectedExecutionException: event executor terminated
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
   	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
   	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
   	at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
   	at org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
   	at org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
   	at org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
   	at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
   	at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
   	at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
   	at org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:467)
   	at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:390)
   	at org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:304)
   	at org.apache.flink.client.program.rest.RestClusterClient.lambda$null$32(RestClusterClient.java:863)
   	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
   	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
   	at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
   	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
   	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1142326848

   @morhidi I wasn't planning on changing the history logic, that would also be a breaking change compared to previous versions so it would need transformation logic to make it compatible.
   
   I suggest we leave it as is, it's mostly for internal operator use anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886365184


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,153 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                .deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(0L);

Review Comment:
   Could the default value of `PERIODIC_SAVEPOINT_INTERVAL ` be 0 milliseconds?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886366106


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,153 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                .deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(0L);

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1142428887

   > @morhidi I wasn't planning on changing the history logic, that would also be a breaking change compared to previous versions so it would need transformation logic to make it compatible.
   > 
   > I suggest we leave it as is, it's mostly for internal operator use anyways.
   
   fair enough, actually I just saw timestamps handled always as a string across all Kubernetes resources, we can even use an additional field for this to keep backward compatibility, but it's good for now. We can rethink what information we would like to really keep in the status once we start moving the internals to configmaps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1142295834

   @morhidi I ran into this issue also with the disposal. I think it's unrelated to my change this is what prompted me to try to improve the dispose logic a little. I will dig a little further into this tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885774741


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   Having an operator config for this feels a little bit off, I'd expect a dedicated param for this, hence asking for clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886202321


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Interval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed an will be done as part of the regular reschedule loop.</td>

Review Comment:
   I can't quite understand `is not guaranteed an will be done` this sentence, is this a typo ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,146 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                .deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(Long.MAX_VALUE);
+
+        var lastPeriodicTriggerTs =
+                jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
+
+        // When the resource is first created/periodic savepointing enabled we have to compare
+        // against the creation timestamp for triggering the first periodic savepoint
+        if (lastPeriodicTriggerTs.equals(0L)) {
+            lastPeriodicTriggerTs =
+                    Instant.parse(resource.getMetadata().getCreationTimestamp()).toEpochMilli();
+        }
+
+        long timeElapsed = System.currentTimeMillis() - lastPeriodicTriggerTs;
+        if (timeElapsed >= savepointInterval) {
+            LOG.info(
+                    "Triggering new periodic savepoint after {} seconds",
+                    Duration.ofMillis(timeElapsed).toSeconds());
+            return Optional.of(SavepointTriggerType.PERIODIC);
+        }
+        return Optional.empty();
     }
 
     public static boolean gracePeriodEnded(
             FlinkOperatorConfiguration configuration, SavepointInfo savepointInfo) {
-        Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
-        long triggerTimestamp = savepointInfo.getTriggerTimestamp();
-        return (System.currentTimeMillis() - triggerTimestamp) > gracePeriod.toMillis();
+        var elapsed = System.currentTimeMillis() - savepointInfo.getTriggerTimestamp();
+        return elapsed > configuration.getSavepointTriggerGracePeriod().toMillis();
+    }
+
+    public static void resetTriggerIfJobNotRunning(
+            KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+        var status = resource.getStatus();
+        var jobStatus = status.getJobStatus();
+        if (!ReconciliationUtils.isJobRunning(status)
+                && SavepointUtils.savepointInProgress(jobStatus)) {
+            jobStatus.getSavepointInfo().resetTrigger();
+            LOG.error("Job is not running, cancelling savepoint operation");
+            EventUtils.createOrUpdateEvent(
+                    client,
+                    resource,
+                    EventUtils.Type.Warning,
+                    "SavepointError",
+                    "Savepoint failed for savepointTriggerNonce: "
+                            + resource.getSpec().getJob().getSavepointTriggerNonce(),

Review Comment:
   nit: Since the savepoint can be triggered periodically or triggerNonce, this error message maybe inaccurate ? Maybe we could improve it by the type in savepointInfo



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -197,6 +197,12 @@ private Optional<String> validateJobSpec(JobSpec job, Map<String, String> confMa
                         String.format(
                                 "Savepoint could not be manually triggered for the running job while config key[%s] is not set",
                                 CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            } else if (configuration.contains(
+                    KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
+                return Optional.of(

Review Comment:
   nit: The error msg of these three check is a bit redundant 😄



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -640,8 +642,8 @@ public SavepointFetchResult fetchSavepointInfo(
                 }
             }
             Savepoint savepoint =
-                    new Savepoint(
-                            System.currentTimeMillis(), response.get().resource().getLocation());
+                    Savepoint.of(
+                            response.get().resource().getLocation(), SavepointTriggerType.UNKNOWN);

Review Comment:
   I this the `UNKNOWN` type is only used for the zero value in the `SavepointInfo`. We could use the correct SavepointTriggerType here directly , do not have to reset it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886363745


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -286,17 +286,15 @@ private static boolean jmMissingForRunningDeployment(FlinkDeployment deployment)
                         == JobManagerDeploymentStatus.MISSING);
     }
 
-    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+    public static boolean isJobInTerminalState(CommonStatus<?> status) {
         var jobState = status.getJobStatus().getState();
         return org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
     }
 
-    public static boolean isJobRunning(FlinkDeploymentStatus status) {
-        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
-        return deploymentStatus == JobManagerDeploymentStatus.READY
-                && org.apache.flink.api.common.JobStatus.RUNNING
-                        .name()
-                        .equals(status.getJobStatus().getState());
+    public static boolean isJobRunning(CommonStatus<?> status) {
+        return org.apache.flink.api.common.JobStatus.RUNNING

Review Comment:
   I agree that this should be improved, but completely out of scope for this PR :) 
   
   I will open a follow up jira.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886395807


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -653,7 +655,10 @@ public SavepointFetchResult fetchSavepointInfo(
     public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
-            clusterClient.disposeSavepoint(savepointPath);
+            clusterClient.sendRequest(

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885771756


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   I can change this so setting it to 0 would turn it off explicitly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886321792


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Interval at which periodic savepoints will be triggered. The triggering schedule is not guaranteed an will be done as part of the regular reschedule loop.</td>

Review Comment:
   fixing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885753975


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   Do we 'enforce' savepoint creation when this is set globally? Can users turn it of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886221893


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -653,7 +655,10 @@ public SavepointFetchResult fetchSavepointInfo(
     public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
-            clusterClient.disposeSavepoint(savepointPath);
+            clusterClient.sendRequest(

Review Comment:
   I check the `disposeSavepoint` internal implementation, why current way is better?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885689686


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -163,4 +163,12 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. "
                                     + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2.");
+
+    public static final ConfigOption<Duration> PERIODIC_SAVEPOINT_INTERVAL =
+            ConfigOptions.key("kubernetes.operator.periodic.savepoint.interval")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Interval at which periodic savepoints will be triggered. "
+                                    + "The triggering schedule is not guaranteed an will be done as part of the regular reschedule loop.");

Review Comment:
   nit: reschedule loop->reconcile loop,  sounds better to me here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886359679


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java:
##########
@@ -286,17 +286,15 @@ private static boolean jmMissingForRunningDeployment(FlinkDeployment deployment)
                         == JobManagerDeploymentStatus.MISSING);
     }
 
-    public static boolean isJobInTerminalState(FlinkDeploymentStatus status) {
+    public static boolean isJobInTerminalState(CommonStatus<?> status) {
         var jobState = status.getJobStatus().getState();
         return org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
     }
 
-    public static boolean isJobRunning(FlinkDeploymentStatus status) {
-        JobManagerDeploymentStatus deploymentStatus = status.getJobManagerDeploymentStatus();
-        return deploymentStatus == JobManagerDeploymentStatus.READY
-                && org.apache.flink.api.common.JobStatus.RUNNING
-                        .name()
-                        .equals(status.getJobStatus().getState());
+    public static boolean isJobRunning(CommonStatus<?> status) {
+        return org.apache.flink.api.common.JobStatus.RUNNING

Review Comment:
   Could this indeed represent wheter the job is running? IMO, this should check with the task execution states in the job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1142302043

   @gyfora I was comparing the savepoint history with the ones on the dashboard. Does it make sense to display it the same way e.g. latest on top, human readable timestamp? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886372165


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointTriggerType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;
+
+/** Savepoint trigger mechanism. */
+public enum SavepointTriggerType {
+    /** Savepoint manually triggered by changing the savepointTriggerNonce. */
+    MANUAL,
+    /** Savepoint periodically triggered by the operator. */
+    PERIODIC,
+    /** Savepoint triggered during stateful upgrade. */
+    UPGRADE,
+    /** Savepoint trigger mechanism unknown. */
+    UNKNOWN

Review Comment:
   I will add it, but it's pretty much an internal detail 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885725325


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   Do we expect this property being set globally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
morhidi commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1141737882

   Thanks @gyfora looks promising. I'll have a look later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886323405


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java:
##########
@@ -17,40 +17,146 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
+import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
 import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
 
+    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);
+
     public static boolean savepointInProgress(JobStatus jobStatus) {
         return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
     }
 
-    public static boolean shouldTriggerSavepoint(JobSpec jobSpec, FlinkDeploymentStatus status) {
-        if (savepointInProgress(status.getJobStatus())) {
+    /**
+     * Triggers any pending manual or periodic savepoints and updates the status accordingly.
+     *
+     * @param flinkService {@link FlinkService} used to trigger savepoints
+     * @param resource Resource that should be savepointed
+     * @param conf Observe config of the resource
+     * @return True if a savepoint was triggered
+     */
+    public static boolean triggerSavepointIfNeeded(
+            FlinkService flinkService, AbstractFlinkResource<?, ?> resource, Configuration conf)
+            throws Exception {
+
+        Optional<SavepointTriggerType> triggerOpt = shouldTriggerSavepoint(resource, conf);
+        if (triggerOpt.isEmpty()) {
             return false;
         }
-        return jobSpec.getSavepointTriggerNonce() != null
-                && !jobSpec.getSavepointTriggerNonce()
-                        .equals(
-                                status.getReconciliationStatus()
-                                        .deserializeLastReconciledSpec()
-                                        .getJob()
-                                        .getSavepointTriggerNonce());
+
+        var triggerType = triggerOpt.get();
+        flinkService.triggerSavepoint(
+                resource.getStatus().getJobStatus().getJobId(),
+                triggerType,
+                resource.getStatus().getJobStatus().getSavepointInfo(),
+                conf);
+
+        if (triggerType == SavepointTriggerType.MANUAL) {
+            ReconciliationUtils.updateSavepointReconciliationSuccess(resource);
+        }
+        return true;
+    }
+
+    /**
+     * Checks whether savepoint should be triggered based on the current status and spec and if yes,
+     * returns the correct {@link SavepointTriggerType}.
+     *
+     * <p>This logic is responsible for both manual and periodic savepoint triggering.
+     *
+     * @param resource Resource to be savepointed
+     * @param conf Observe configuration of the resource
+     * @return Optional @{@link SavepointTriggerType}
+     */
+    @VisibleForTesting
+    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(
+            AbstractFlinkResource<?, ?> resource, Configuration conf) {
+
+        var status = resource.getStatus();
+        var jobSpec = resource.getSpec().getJob();
+        var jobStatus = status.getJobStatus();
+
+        if (!ReconciliationUtils.isJobRunning(status) || savepointInProgress(jobStatus)) {
+            return Optional.empty();
+        }
+
+        var triggerNonceChanged =
+                jobSpec.getSavepointTriggerNonce() != null
+                        && !jobSpec.getSavepointTriggerNonce()
+                                .equals(
+                                        status.getReconciliationStatus()
+                                                .deserializeLastReconciledSpec()
+                                                .getJob()
+                                                .getSavepointTriggerNonce());
+        if (triggerNonceChanged) {
+            return Optional.of(SavepointTriggerType.MANUAL);
+        }
+
+        var savepointInterval =
+                conf.getOptional(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)
+                        .map(Duration::toMillis)
+                        .orElse(Long.MAX_VALUE);
+
+        var lastPeriodicTriggerTs =
+                jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
+
+        // When the resource is first created/periodic savepointing enabled we have to compare
+        // against the creation timestamp for triggering the first periodic savepoint
+        if (lastPeriodicTriggerTs.equals(0L)) {
+            lastPeriodicTriggerTs =
+                    Instant.parse(resource.getMetadata().getCreationTimestamp()).toEpochMilli();
+        }
+
+        long timeElapsed = System.currentTimeMillis() - lastPeriodicTriggerTs;
+        if (timeElapsed >= savepointInterval) {
+            LOG.info(
+                    "Triggering new periodic savepoint after {} seconds",
+                    Duration.ofMillis(timeElapsed).toSeconds());
+            return Optional.of(SavepointTriggerType.PERIODIC);
+        }
+        return Optional.empty();
     }
 
     public static boolean gracePeriodEnded(
             FlinkOperatorConfiguration configuration, SavepointInfo savepointInfo) {
-        Duration gracePeriod = configuration.getSavepointTriggerGracePeriod();
-        long triggerTimestamp = savepointInfo.getTriggerTimestamp();
-        return (System.currentTimeMillis() - triggerTimestamp) > gracePeriod.toMillis();
+        var elapsed = System.currentTimeMillis() - savepointInfo.getTriggerTimestamp();
+        return elapsed > configuration.getSavepointTriggerGracePeriod().toMillis();
+    }
+
+    public static void resetTriggerIfJobNotRunning(
+            KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+        var status = resource.getStatus();
+        var jobStatus = status.getJobStatus();
+        if (!ReconciliationUtils.isJobRunning(status)
+                && SavepointUtils.savepointInProgress(jobStatus)) {
+            jobStatus.getSavepointInfo().resetTrigger();
+            LOG.error("Job is not running, cancelling savepoint operation");
+            EventUtils.createOrUpdateEvent(
+                    client,
+                    resource,
+                    EventUtils.Type.Warning,
+                    "SavepointError",
+                    "Savepoint failed for savepointTriggerNonce: "
+                            + resource.getSpec().getJob().getSavepointTriggerNonce(),

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r885762773


##########
docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html:
##########
@@ -74,6 +74,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.periodic.savepoint.interval</h5></td>

Review Comment:
   At the moment there is way to "turn it off" if it's defined globally but you could technically set it to a very large interval to never trigger it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886364266


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointTriggerType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;
+
+/** Savepoint trigger mechanism. */
+public enum SavepointTriggerType {
+    /** Savepoint manually triggered by changing the savepointTriggerNonce. */
+    MANUAL,
+    /** Savepoint periodically triggered by the operator. */
+    PERIODIC,
+    /** Savepoint triggered during stateful upgrade. */
+    UPGRADE,
+    /** Savepoint trigger mechanism unknown. */
+    UNKNOWN

Review Comment:
   When the mechanism is not know, for example any savepoint triggered before this update will not have this information.
   
   Or in cases when the savepoint info is retrieved from the flink cluster directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] SteNicholas commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886361040


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/SavepointTriggerType.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.status;
+
+/** Savepoint trigger mechanism. */
+public enum SavepointTriggerType {
+    /** Savepoint manually triggered by changing the savepointTriggerNonce. */
+    MANUAL,
+    /** Savepoint periodically triggered by the operator. */
+    PERIODIC,
+    /** Savepoint triggered during stateful upgrade. */
+    UPGRADE,
+    /** Savepoint trigger mechanism unknown. */
+    UNKNOWN

Review Comment:
   Which case is the `UNKNOWN` used for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#issuecomment-1143133417

   @Sushant20 The operator intentionally doesn't support restoring from arbitrary savepoints "on-the-fly" to avoid confusion in terms of what savepoint will actually be used (last savepoint available / user specified). I think this is a confusing design the google operator because it's not clear what happens if the job is already running.
   
   What we support here:
   
   1. Delete the FlinkDeployment resource. This will stop the job and clean up all the resources.
   2. Set `FlinkDeployment.spec.job.initialSavepointPath` to your desired savepoint restore path
   3. Re-create FlinkDeployment re-source
   
   As the name suggests `initialSavepointPath` settings will only take effect when a given deployment is submitted for the first time. After that it is always the latest savepoint/checkpoint information to avoid any state loss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #249: [FLINK-26179] Support for periodic savepoints

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #249:
URL: https://github.com/apache/flink-kubernetes-operator/pull/249#discussion_r886332488


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java:
##########
@@ -653,7 +655,10 @@ public SavepointFetchResult fetchSavepointInfo(
     public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
         try (RestClusterClient<String> clusterClient =
                 (RestClusterClient<String>) getClusterClient(conf)) {
-            clusterClient.disposeSavepoint(savepointPath);
+            clusterClient.sendRequest(

Review Comment:
   The disposeSavepoint method of the RestClusterClient, first triggers the dispose action and then keeps polling Flink until it completes.
   
   The way we use it, this is completely unnecessary as we do not handle/retry dispose actions from our side anyways. This way we can save some rest api calls.
   
   However we lose the error logs unfortunately if the dispose actually failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org