You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/11/16 15:07:00 UTC

[flink-kubernetes-operator] branch main updated (6e597d4c -> 7d801050)

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


    from 6e597d4c [FLINK-29609] Shut down JM for terminated applications after configured duration
     new 1fba88e9 [FLINK-29959] Use optimistic locking when updating the status to avoid potential race conditions
     new 7d801050 [hotfix] Remove some unnecessary empty strings from status

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content/docs/custom-resource/reference.md     |  8 +-
 e2e-tests/test_application_operations.sh           |  6 +-
 e2e-tests/test_sessionjob_operations.sh            |  4 +-
 .../operator/api/status/CommonStatus.java          |  2 +-
 .../kubernetes/operator/api/status/Savepoint.java  | 39 +++++----
 .../operator/api/status/SavepointInfo.java         | 15 ++--
 .../operator/api/utils/BaseTestUtils.java          |  2 +
 ...Exception.java => StatusConflictException.java} | 14 +---
 .../AbstractFlinkDeploymentObserver.java           |  2 +-
 .../operator/reconciler/ReconciliationUtils.java   |  2 +-
 .../deployment/AbstractJobReconciler.java          |  2 +-
 .../kubernetes/operator/utils/SavepointUtils.java  | 34 +++++---
 .../kubernetes/operator/utils/StatusRecorder.java  | 98 +++++++++++++++++-----
 .../kubernetes/operator/FlinkOperatorITCase.java   | 30 +++++--
 .../kubernetes/operator/TestingStatusRecorder.java |  3 +-
 .../controller/FlinkDeploymentControllerTest.java  |  4 +-
 .../operator/controller/RollbackTest.java          |  8 +-
 .../metrics/KubernetesClientMetricsTest.java       |  2 +-
 .../lifecycle/ResourceLifecycleMetricsTest.java    |  2 +-
 .../operator/observer/SavepointObserverTest.java   |  3 +-
 .../deployment/ApplicationReconcilerTest.java      | 13 +--
 21 files changed, 182 insertions(+), 111 deletions(-)
 copy flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/{ReconciliationException.java => StatusConflictException.java} (73%)


[flink-kubernetes-operator] 01/02: [FLINK-29959] Use optimistic locking when updating the status to avoid potential race conditions

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 1fba88e9e6bc32ef4155a20a4713d5623a709356
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Nov 10 18:02:43 2022 +0100

    [FLINK-29959] Use optimistic locking when updating the status to avoid potential race conditions
---
 .../operator/api/utils/BaseTestUtils.java          |  2 +
 .../exception/StatusConflictException.java         | 28 +++++++
 .../kubernetes/operator/utils/StatusRecorder.java  | 98 +++++++++++++++++-----
 .../kubernetes/operator/FlinkOperatorITCase.java   | 30 +++++--
 .../kubernetes/operator/TestingStatusRecorder.java |  3 +-
 .../metrics/KubernetesClientMetricsTest.java       |  2 +-
 6 files changed, 133 insertions(+), 30 deletions(-)

diff --git a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index a8ccdbe7..7dde1a8e 100644
--- a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -76,6 +76,7 @@ public class BaseTestUtils {
                         .withNamespace(namespace)
                         .withCreationTimestamp(Instant.now().toString())
                         .withUid(UUID.randomUUID().toString())
+                        .withResourceVersion("1")
                         .build());
         deployment.setSpec(getTestFlinkDeploymentSpec(version));
         return deployment;
@@ -119,6 +120,7 @@ public class BaseTestUtils {
                         .withCreationTimestamp(Instant.now().toString())
                         .withUid(UUID.randomUUID().toString())
                         .withGeneration(1L)
+                        .withResourceVersion("1")
                         .build());
 
         Map<String, String> conf = new HashMap<>();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java
new file mode 100644
index 00000000..614b45af
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/exception/StatusConflictException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.exception;
+
+/** Exception for status updates. */
+public class StatusConflictException extends RuntimeException {
+
+    private static final long serialVersionUID = 2260638990044248181L;
+
+    public StatusConflictException(String msg) {
+        super(msg);
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index 8ff79e22..b1bec2f7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
@@ -26,13 +25,16 @@ import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
 import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +52,7 @@ public class StatusRecorder<
 
     protected final ObjectMapper objectMapper = new ObjectMapper();
 
-    protected final ConcurrentHashMap<Tuple2<String, String>, ObjectNode> statusCache =
+    protected final ConcurrentHashMap<ResourceID, ObjectNode> statusCache =
             new ConcurrentHashMap<>();
 
     private final KubernetesClient client;
@@ -76,13 +78,10 @@ public class StatusRecorder<
      */
     @SneakyThrows
     public void patchAndCacheStatus(CR resource) {
-        // This is necessary so the client wouldn't fail of the underlying resource spec was updated
-        // in the meantime
-        resource.getMetadata().setResourceVersion(null);
-
         ObjectNode newStatusNode =
                 objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
-        ObjectNode previousStatusNode = statusCache.put(getKey(resource), newStatusNode);
+        var resourceId = ResourceID.fromResource(resource);
+        ObjectNode previousStatusNode = statusCache.get(resourceId);
 
         if (newStatusNode.equals(previousStatusNode)) {
             LOG.debug("No status change.");
@@ -97,20 +96,79 @@ public class StatusRecorder<
 
         Exception err = null;
         for (int i = 0; i < 3; i++) {
-            // In any case we retry the status update 3 times to avoid some intermittent
-            // connectivity errors if any
+            // We retry the status update 3 times to avoid some intermittent connectivity errors
             try {
-                client.resource(resource).patchStatus();
-                statusUpdateListener.accept(resource, prevStatus);
-                metricManager.onUpdate(resource);
-                return;
-            } catch (Exception e) {
+                replaceStatus(resource, prevStatus);
+                err = null;
+            } catch (KubernetesClientException e) {
                 LOG.error("Error while patching status, retrying {}/3...", (i + 1), e);
                 Thread.sleep(1000);
                 err = e;
             }
         }
-        throw err;
+
+        if (err != null) {
+            throw err;
+        }
+
+        statusCache.put(resourceId, newStatusNode);
+        statusUpdateListener.accept(resource, prevStatus);
+        metricManager.onUpdate(resource);
+    }
+
+    private void replaceStatus(CR resource, STATUS prevStatus) throws JsonProcessingException {
+        int retries = 0;
+        while (true) {
+            try {
+                var updated = client.resource(resource).lockResourceVersion().replaceStatus();
+
+                // If we successfully replaced the status, update the resource version so we know
+                // what to lock next in the same reconciliation loop
+                resource.getMetadata()
+                        .setResourceVersion(updated.getMetadata().getResourceVersion());
+                return;
+            } catch (KubernetesClientException kce) {
+                // 409 is the error code for conflicts resulting from the locking
+                if (kce.getCode() == 409) {
+                    var currentVersion = resource.getMetadata().getResourceVersion();
+                    LOG.debug(
+                            "Could not apply status update for resource version {}",
+                            currentVersion);
+
+                    var latest = client.resource(resource).fromServer().get();
+                    var latestVersion = latest.getMetadata().getResourceVersion();
+
+                    if (latestVersion.equals(currentVersion)) {
+                        // This should not happen as long as the client works consistently
+                        LOG.error("Unable to fetch latest resource version");
+                        throw kce;
+                    }
+
+                    if (latest.getStatus().equals(prevStatus)) {
+                        if (retries++ < 3) {
+                            LOG.debug(
+                                    "Retrying status update for latest version {}", latestVersion);
+                            resource.getMetadata().setResourceVersion(latestVersion);
+                        } else {
+                            // If we cannot get the latest version in 3 tries we throw the error to
+                            // retry with delay
+                            throw kce;
+                        }
+                    } else {
+                        throw new StatusConflictException(
+                                "Status have been modified externally in version "
+                                        + latestVersion
+                                        + " Previous: "
+                                        + objectMapper.writeValueAsString(prevStatus)
+                                        + " Latest: "
+                                        + objectMapper.writeValueAsString(latest.getStatus()));
+                    }
+                } else {
+                    // We simply throw non conflict errors, to trigger retry with delay
+                    throw kce;
+                }
+            }
+        }
     }
 
     /**
@@ -124,7 +182,7 @@ public class StatusRecorder<
      * @param resource Resource for which the status should be updated from the cache
      */
     public void updateStatusFromCache(CR resource) {
-        var key = getKey(resource);
+        var key = ResourceID.fromResource(resource);
         var cachedStatus = statusCache.get(key);
         if (cachedStatus != null) {
             resource.setStatus(
@@ -147,14 +205,10 @@ public class StatusRecorder<
      * @param resource Flink resource.
      */
     public void removeCachedStatus(CR resource) {
-        statusCache.remove(getKey(resource));
+        statusCache.remove(ResourceID.fromResource(resource));
         metricManager.onRemove(resource);
     }
 
-    protected static Tuple2<String, String> getKey(HasMetadata resource) {
-        return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName());
-    }
-
     public static <S extends CommonStatus<?>, CR extends AbstractFlinkResource<?, S>>
             StatusRecorder<CR, S> create(
                     KubernetesClient kubernetesClient,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index 8aa50359..1a0b0ac1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -23,6 +23,10 @@ import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobManagerSpec;
 import org.apache.flink.kubernetes.operator.api.spec.Resource;
 import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.StatusConflictException;
+import org.apache.flink.kubernetes.operator.metrics.MetricManager;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import io.fabric8.kubernetes.api.model.Namespace;
 import io.fabric8.kubernetes.api.model.NamespaceBuilder;
@@ -43,6 +47,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /** Flink Operator integration test. */
 public class FlinkOperatorITCase {
@@ -50,10 +55,11 @@ public class FlinkOperatorITCase {
     private static final String TEST_NAMESPACE = "flink-operator-test";
     private static final String SERVICE_ACCOUNT = "flink-operator";
     private static final String CLUSTER_ROLE_BINDING = "flink-operator-role-binding";
-    private static final String FLINK_VERSION = "1.15.1";
+    private static final String FLINK_VERSION = "1.15";
     private static final String IMAGE = String.format("flink:%s", FLINK_VERSION);
     private static final Logger LOG = LoggerFactory.getLogger(FlinkOperatorITCase.class);
-    private KubernetesClient client;
+    public static final String SESSION_NAME = "test-session-cluster";
+    private static KubernetesClient client;
 
     @BeforeEach
     public void setup() {
@@ -83,7 +89,7 @@ public class FlinkOperatorITCase {
     public void test() {
         FlinkDeployment flinkDeployment = buildSessionCluster();
         LOG.info("Deploying {}", flinkDeployment.getMetadata().getName());
-        client.resource(flinkDeployment).createOrReplace();
+        var v1 = client.resource(flinkDeployment).createOrReplace();
 
         await().atMost(1, MINUTES)
                 .untilAsserted(
@@ -92,16 +98,28 @@ public class FlinkOperatorITCase {
                                         client.apps()
                                                 .deployments()
                                                 .inNamespace(TEST_NAMESPACE)
-                                                .withName(flinkDeployment.getMetadata().getName())
+                                                .withName(SESSION_NAME)
                                                 .isReady(),
                                         is(true)));
+
+        // Test status recorder locking logic
+        var statusRecorder =
+                new StatusRecorder<FlinkDeployment, FlinkDeploymentStatus>(
+                        client, new MetricManager<>(), (a, b) -> {});
+        try {
+            v1.getStatus().setError("e2");
+            // Should throw error as status was modified externally
+            statusRecorder.patchAndCacheStatus(v1);
+            fail();
+        } catch (StatusConflictException expected) {
+        }
     }
 
     private static FlinkDeployment buildSessionCluster() {
         FlinkDeployment deployment = new FlinkDeployment();
         deployment.setMetadata(
                 new ObjectMetaBuilder()
-                        .withName("test-session-cluster")
+                        .withName(SESSION_NAME)
                         .withNamespace(TEST_NAMESPACE)
                         .build());
         FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
@@ -122,7 +140,7 @@ public class FlinkOperatorITCase {
         return deployment;
     }
 
-    private void rbacSetup() {
+    private static void rbacSetup() {
         LOG.info("Creating service account {}", SERVICE_ACCOUNT);
         ServiceAccount serviceAccount =
                 new ServiceAccountBuilder()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index ebe8fb39..7cd010e3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.operator.metrics.MetricManager;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
 
 /** Testing statusRecorder. */
 public class TestingStatusRecorder<
@@ -37,7 +38,7 @@ public class TestingStatusRecorder<
     @Override
     public void patchAndCacheStatus(CR resource) {
         statusCache.put(
-                getKey(resource),
+                ResourceID.fromResource(resource),
                 objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
index 8bacff26..64f27da6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/KubernetesClientMetricsTest.java
@@ -92,7 +92,7 @@ public class KubernetesClientMetricsTest {
                         mockServer.createClient().getConfiguration());
 
         var deployment = TestUtils.buildApplicationCluster();
-        kubernetesClient.resource(deployment).get();
+        kubernetesClient.resource(deployment).fromServer().get();
         assertFalse(listener.getCounter(listener.getMetricId(REQUEST_COUNTER_ID)).isPresent());
         assertFalse(listener.getMeter(listener.getMetricId(REQUEST_METER_ID)).isPresent());
         assertFalse(


[flink-kubernetes-operator] 02/02: [hotfix] Remove some unnecessary empty strings from status

Posted by gy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 7d80105020e603eb582f42d3462a2f1de832db49
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu Nov 10 18:21:04 2022 +0100

    [hotfix] Remove some unnecessary empty strings from status
---
 docs/content/docs/custom-resource/reference.md     |  8 ++---
 e2e-tests/test_application_operations.sh           |  6 ++--
 e2e-tests/test_sessionjob_operations.sh            |  4 +--
 .../operator/api/status/CommonStatus.java          |  2 +-
 .../kubernetes/operator/api/status/Savepoint.java  | 39 ++++++++++++----------
 .../operator/api/status/SavepointInfo.java         | 15 +++++----
 .../AbstractFlinkDeploymentObserver.java           |  2 +-
 .../operator/reconciler/ReconciliationUtils.java   |  2 +-
 .../deployment/AbstractJobReconciler.java          |  2 +-
 .../kubernetes/operator/utils/SavepointUtils.java  | 34 ++++++++++++-------
 .../controller/FlinkDeploymentControllerTest.java  |  4 +--
 .../operator/controller/RollbackTest.java          |  8 ++---
 .../lifecycle/ResourceLifecycleMetricsTest.java    |  2 +-
 .../operator/observer/SavepointObserverTest.java   |  3 +-
 .../deployment/ApplicationReconcilerTest.java      | 13 +-------
 15 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 543f67aa..8977e5e6 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -274,8 +274,8 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | timeStamp | long | Millisecond timestamp at the start of the savepoint operation. |
 | location | java.lang.String | External pointer of the savepoint can be used to recover jobs. |
 | triggerType | org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType | Savepoint trigger mechanism. |
-| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType |  |
-| triggerNonce | java.lang.Long | Nonce value used when the savepoint was triggered manually {@link SavepointTriggerType#MANUAL}, defaults to 0. |
+| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType | Savepoint format. |
+| triggerNonce | java.lang.Long | Nonce value used when the savepoint was triggered manually {@link SavepointTriggerType#MANUAL}, null for other types of savepoints. |
 
 ### SavepointFormatType
 **Class**: org.apache.flink.kubernetes.operator.api.status.SavepointFormatType
@@ -297,9 +297,9 @@ This page serves as a full reference for FlinkDeployment custom resource definit
 | ----------| ---- | ---- |
 | lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
 | triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
-| triggerTimestamp | long | Trigger timestamp of a pending savepoint operation. |
+| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
 | triggerType | org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType | Savepoint trigger mechanism. |
-| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType |  |
+| formatType | org.apache.flink.kubernetes.operator.api.status.SavepointFormatType | Savepoint format. |
 | savepointHistory | java.util.List<org.apache.flink.kubernetes.operator.api.status.Savepoint> | List of recent savepoints. |
 | lastPeriodicSavepointTimestamp | long | Trigger timestamp of last periodic savepoint operation. |
 
diff --git a/e2e-tests/test_application_operations.sh b/e2e-tests/test_application_operations.sh
index c9d3565e..457972c8 100755
--- a/e2e-tests/test_application_operations.sh
+++ b/e2e-tests/test_application_operations.sh
@@ -45,8 +45,8 @@ job_id=$(kubectl logs $jm_pod_name -c flink-main-container | grep -E -o 'Job [a-
 # Testing trigger savepoint
 kubectl patch $APPLICATION_IDENTIFIER --type merge --patch '{"spec":{"job": {"savepointTriggerNonce": 123456 } } }'
 wait_for_logs $jm_pod_name "Triggering savepoint for job" ${TIMEOUT} || exit 1
-wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' "" $TIMEOUT || exit 1
-wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' 0 $TIMEOUT || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1
+wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1
 location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.savepointInfo.lastSavepoint.location')
 if [ "$location" == "" ];then
   echo "lost savepoint location"
@@ -68,5 +68,3 @@ wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RU
 assert_available_slots 1 $CLUSTER_ID
 
 echo "Successfully run the last-state upgrade test"
-
-
diff --git a/e2e-tests/test_sessionjob_operations.sh b/e2e-tests/test_sessionjob_operations.sh
index 638f3522..b1c88fc2 100755
--- a/e2e-tests/test_sessionjob_operations.sh
+++ b/e2e-tests/test_sessionjob_operations.sh
@@ -46,8 +46,8 @@ assert_available_slots 0 $CLUSTER_ID
 # Testing trigger savepoint
 kubectl patch sessionjob ${SESSION_JOB_NAME} --type merge --patch '{"spec":{"job": {"savepointTriggerNonce": 123456 } } }'
 wait_for_logs $jm_pod_name "Triggering savepoint for job" ${TIMEOUT} || exit 1
-wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' "" $TIMEOUT || exit 1
-wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' 0 $TIMEOUT || exit 1
+wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.savepointInfo.triggerId' null $TIMEOUT || exit 1
+wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.savepointInfo.triggerTimestamp' null $TIMEOUT || exit 1
 location=$(kubectl get $SESSION_JOB_IDENTIFIER -o yaml | yq '.status.jobStatus.savepointInfo.lastSavepoint.location')
 if [ "$location" == "" ];then
   echo "lost savepoint location"
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 5ed0c318..b90949af 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -41,7 +41,7 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
     private JobStatus jobStatus = new JobStatus();
 
     /** Error information about the FlinkDeployment/FlinkSessionJob. */
-    private String error = "";
+    private String error;
 
     /**
      * Current reconciliation status of this resource.
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
index 026ffce0..e754a1cf 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/Savepoint.java
@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Experimental;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.annotation.Nullable;
+
 /** Represents information about a finished savepoint. */
 @Experimental
 @Data
@@ -36,42 +38,43 @@ public class Savepoint {
     /** Savepoint trigger mechanism. */
     private SavepointTriggerType triggerType = SavepointTriggerType.UNKNOWN;
 
+    /** Savepoint format. */
     private SavepointFormatType formatType = SavepointFormatType.UNKNOWN;
 
     /**
      * Nonce value used when the savepoint was triggered manually {@link
-     * SavepointTriggerType#MANUAL}, defaults to 0.
+     * SavepointTriggerType#MANUAL}, null for other types of savepoints.
      */
-    private Long triggerNonce = 0L;
+    private Long triggerNonce;
 
     public Savepoint(
             long timeStamp,
             String location,
-            SavepointTriggerType triggerType,
-            SavepointFormatType formatType,
-            Long triggerNonce) {
+            @Nullable SavepointTriggerType triggerType,
+            @Nullable SavepointFormatType formatType,
+            @Nullable Long triggerNonce) {
         this.timeStamp = timeStamp;
         this.location = location;
-        this.triggerType = triggerType;
-        this.formatType = formatType;
-        setTriggerNonce(triggerNonce);
+        if (triggerType != null) {
+            this.triggerType = triggerType;
+        }
+        if (formatType != null) {
+            this.formatType = formatType;
+        }
+        this.triggerNonce = triggerNonce;
     }
 
     public static Savepoint of(String location, SavepointTriggerType triggerType) {
         return new Savepoint(
-                System.currentTimeMillis(), location, triggerType, SavepointFormatType.UNKNOWN, 0L);
+                System.currentTimeMillis(),
+                location,
+                triggerType,
+                SavepointFormatType.UNKNOWN,
+                null);
     }
 
     public static Savepoint of(
             String location, SavepointTriggerType triggerType, SavepointFormatType formatType) {
-        return new Savepoint(System.currentTimeMillis(), location, triggerType, formatType, 0L);
-    }
-
-    public void setTriggerNonce(Long triggerNonce) {
-        if (triggerNonce == null) {
-            this.triggerNonce = 0L;
-        } else {
-            this.triggerNonce = triggerNonce;
-        }
+        return new Savepoint(System.currentTimeMillis(), location, triggerType, formatType, null);
     }
 }
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
index ce7562dc..a270e83f 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
@@ -38,15 +38,16 @@ public class SavepointInfo {
     private Savepoint lastSavepoint;
 
     /** Trigger id of a pending savepoint operation. */
-    private String triggerId = "";
+    private String triggerId;
 
     /** Trigger timestamp of a pending savepoint operation. */
-    private long triggerTimestamp = 0L;
+    private Long triggerTimestamp;
 
     /** Savepoint trigger mechanism. */
-    private SavepointTriggerType triggerType = SavepointTriggerType.UNKNOWN;
+    private SavepointTriggerType triggerType;
 
-    private SavepointFormatType formatType = SavepointFormatType.UNKNOWN;
+    /** Savepoint format. */
+    private SavepointFormatType formatType;
 
     /** List of recent savepoints. */
     private List<Savepoint> savepointHistory = new ArrayList<>();
@@ -63,8 +64,10 @@ public class SavepointInfo {
     }
 
     public void resetTrigger() {
-        this.triggerId = "";
-        this.triggerTimestamp = 0L;
+        this.triggerId = null;
+        this.triggerTimestamp = null;
+        this.triggerType = null;
+        this.formatType = null;
     }
 
     /**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index 6755281d..5cd8f165 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -207,7 +207,7 @@ public abstract class AbstractFlinkDeploymentObserver
         if (status.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.ERROR
                 && !JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
                 && reconciliationStatus.isLastReconciledSpecStable()) {
-            status.setError("");
+            status.setError(null);
         }
     }
 
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 9f1da2d3..a7d1b05a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -103,7 +103,7 @@ public class ReconciliationUtils {
         var reconciliationStatus = status.getReconciliationStatus();
 
         // Clear errors
-        status.setError("");
+        status.setError(null);
         reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
         reconciliationStatus.setState(
                 upgrading ? ReconciliationState.UPGRADING : ReconciliationState.DEPLOYED);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 8c6f7c65..370d96dd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -247,7 +247,7 @@ public abstract class AbstractJobReconciler<
                 && observeConfig.getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
             LOG.info("Stopping failed Flink job...");
             cleanupAfterFailedJob(resource, context, observeConfig);
-            resource.getStatus().setError("");
+            resource.getStatus().setError(null);
             resubmitJob(resource, context, observeConfig, false);
             return true;
         } else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index b353a314..29966692 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -54,13 +54,6 @@ public class SavepointUtils {
         var jobStatus = status.getJobStatus();
         var savepointInfo = jobStatus.getSavepointInfo();
 
-        // For non-manual savepointing, we always report SUCCEEDED if no longer pending
-        if (savepointInfo.getTriggerType() != SavepointTriggerType.MANUAL) {
-            return savepointInProgress(jobStatus)
-                    ? SavepointStatus.PENDING
-                    : SavepointStatus.SUCCEEDED;
-        }
-
         var targetSavepointTriggerNonce = resource.getSpec().getJob().getSavepointTriggerNonce();
         var reconcileSavepointTriggerNonce =
                 status.getReconciliationStatus()
@@ -68,19 +61,36 @@ public class SavepointUtils {
                         .getJob()
                         .getSavepointTriggerNonce();
 
+        if (savepointInfo.getTriggerId() != null) {
+            return SavepointStatus.PENDING;
+        }
+
         // if savepointTriggerNonce is cleared, savepoint is not triggered.
-        // For manual savepointing, we report pending status
+        // For manual savepoints, we report pending status
         // during retries while the triggerId gets reset between retries.
         if (targetSavepointTriggerNonce != null
                 && !Objects.equals(targetSavepointTriggerNonce, reconcileSavepointTriggerNonce)) {
             return SavepointStatus.PENDING;
         }
-        if (savepointInfo.getLastSavepoint() != null
-                && Objects.equals(
-                        reconcileSavepointTriggerNonce,
-                        savepointInfo.getLastSavepoint().getTriggerNonce())) {
+
+        var lastSavepoint = savepointInfo.getLastSavepoint();
+        if (lastSavepoint != null) {
+            // Last savepoint was manual and triggerNonce matches
+            if (Objects.equals(
+                    reconcileSavepointTriggerNonce,
+                    savepointInfo.getLastSavepoint().getTriggerNonce())) {
+                return SavepointStatus.SUCCEEDED;
+            }
+
+            // Last savepoint was not manual
+            if (lastSavepoint.getTriggerType() != SavepointTriggerType.MANUAL) {
+                return SavepointStatus.SUCCEEDED;
+            }
+        } else {
+            // Currently, we return SUCCEEDED if no savepoints were ever taken
             return SavepointStatus.SUCCEEDED;
         }
+
         return SavepointStatus.ABANDONED;
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 2bb1f02c..9669242e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -131,7 +131,7 @@ public class FlinkDeploymentControllerTest {
         // Validate reconciliation status
         ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus =
                 appCluster.getStatus().getReconciliationStatus();
-        assertEquals("", appCluster.getStatus().getError());
+        assertNull(appCluster.getStatus().getError());
         assertEquals(appCluster.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
         assertNull(appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
 
@@ -971,7 +971,7 @@ public class FlinkDeploymentControllerTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("", appCluster.getStatus().getError());
+        assertNull(appCluster.getStatus().getError());
 
         assertEquals(
                 appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 0efed72e..f2fc4d05 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -343,7 +343,7 @@ public class RollbackTest {
         assertEquals(
                 ReconciliationState.DEPLOYED,
                 deployment.getStatus().getReconciliationStatus().getState());
-        assertEquals("", deployment.getStatus().getError());
+        assertNull(deployment.getStatus().getError());
 
         deployment.getSpec().setRestartNonce(456L);
         triggerRollback.run();
@@ -375,7 +375,7 @@ public class RollbackTest {
             assertEquals(
                     ReconciliationState.DEPLOYED,
                     deployment.getStatus().getReconciliationStatus().getState());
-            assertEquals("", deployment.getStatus().getError());
+            assertNull(deployment.getStatus().getError());
 
             deployment.getSpec().getJob().setState(JobState.RUNNING);
             testController.reconcile(deployment, context);
@@ -388,7 +388,7 @@ public class RollbackTest {
             assertEquals(
                     ReconciliationState.DEPLOYED,
                     deployment.getStatus().getReconciliationStatus().getState());
-            assertEquals("", deployment.getStatus().getError());
+            assertNull(deployment.getStatus().getError());
 
             // Verify suspending a rolled back job
             triggerRollback.run();
@@ -406,7 +406,7 @@ public class RollbackTest {
             assertEquals(
                     ReconciliationState.DEPLOYED,
                     deployment.getStatus().getReconciliationStatus().getState());
-            assertEquals("", deployment.getStatus().getError());
+            assertNull(deployment.getStatus().getError());
         }
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 90e72524..61516a89 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -81,7 +81,7 @@ public class ResourceLifecycleMetricsTest {
         application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
         assertEquals(FAILED, application.getStatus().getLifecycleState());
 
-        application.getStatus().setError("");
+        application.getStatus().setError(null);
 
         application
                 .getStatus()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index dd4d3d07..85c51d58 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -43,6 +43,7 @@ import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link SavepointObserver}. */
@@ -172,6 +173,6 @@ public class SavepointObserverTest {
         assertEquals(savepointInfo.getLastSavepoint(), savepointInfo.getSavepointHistory().get(0));
         assertEquals(
                 SavepointTriggerType.PERIODIC, savepointInfo.getLastSavepoint().getTriggerType());
-        assertEquals(0L, savepointInfo.getLastSavepoint().getTriggerNonce());
+        assertNull(savepointInfo.getLastSavepoint().getTriggerNonce());
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b2b2bb11..39b03365 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -287,23 +287,13 @@ public class ApplicationReconcilerTest {
                 SavepointTriggerType.MANUAL,
                 spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
 
-        spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
         ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
                 spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment);
+        spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
 
         // don't trigger when nonce is the same
         reconciler.reconcile(spDeployment, context);
         assertFalse(SavepointUtils.savepointInProgress(spDeployment.getStatus().getJobStatus()));
-
-        spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
-        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
-                spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment);
-        // If manual savepointing is done but the last savepont is not updated, it is considered
-        // abandoned.
-        assertEquals(
-                SavepointStatus.ABANDONED, SavepointUtils.getLastSavepointStatus(spDeployment));
-
-        updateLastSavepoint(spDeployment);
         assertEquals(
                 SavepointStatus.SUCCEEDED, SavepointUtils.getLastSavepointStatus(spDeployment));
 
@@ -329,7 +319,6 @@ public class ApplicationReconcilerTest {
         assertEquals(
                 SavepointTriggerType.MANUAL,
                 spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerType());
-        spDeployment.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
         ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
                 spDeployment.getStatus().getJobStatus().getSavepointInfo(), spDeployment);
         updateLastSavepoint(spDeployment);