You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/10 06:46:33 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d1be9c  [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration
9d1be9c is described below

commit 9d1be9cadd2b292b2a9ea37852f1b3cda65d1422
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jun 10 14:46:29 2022 +0800

    [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration
---
 .../kubernetes_operator_config_configuration.html  | 12 +++++
 .../config/FlinkOperatorConfiguration.java         | 18 ++++----
 .../config/KubernetesOperatorConfigOptions.java    | 14 ++++++
 .../operator/observer/SavepointObserver.java       | 16 ++++++-
 .../operator/utils/ConfigOptionUtils.java          | 52 +++++++++++++++++++++
 .../operator/observer/SavepointObserverTest.java   | 34 ++++++++++++++
 .../operator/utils/ConfigOptionUtilsTest.java      | 53 ++++++++++++++++++++++
 7 files changed, 189 insertions(+), 10 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 08c72d0..71cf1ac 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -110,12 +110,24 @@
             <td>Duration</td>
             <td>Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Duration</td>
+            <td>Maximum age threshold for savepoint history entries to retain.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
             <td style="word-wrap: break-word;">10</td>
             <td>Integer</td>
             <td>Maximum number of savepoint history entries to retain.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Maximum number threshold of savepoint history entries to retain.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
             <td style="word-wrap: break-word;">1 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 36b9383..29e1264 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -40,8 +40,8 @@ public class FlinkOperatorConfiguration {
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
-    int savepointHistoryMaxCount;
-    Duration savepointHistoryMaxAge;
+    Integer savepointHistoryCountThreshold;
+    Duration savepointHistoryAgeThreshold;
 
     public static FlinkOperatorConfiguration fromConfiguration(
             Configuration operatorConfig, Set<String> watchedNamespaces) {
@@ -79,12 +79,14 @@ public class FlinkOperatorConfiguration {
                 operatorConfig.get(
                         KubernetesOperatorConfigOptions.OPERATOR_USER_ARTIFACTS_BASE_DIR);
 
-        int savepointHistoryMaxCount =
+        Integer savepointHistoryCountThreshold =
                 operatorConfig.get(
-                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
-        Duration savepointHistoryMaxAge =
+                        KubernetesOperatorConfigOptions
+                                .OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD);
+        Duration savepointHistoryAgeThreshold =
                 operatorConfig.get(
-                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE);
+                        KubernetesOperatorConfigOptions
+                                .OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);
 
         String flinkServiceHostOverride = null;
         if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
@@ -103,7 +105,7 @@ public class FlinkOperatorConfiguration {
                 flinkCancelJobTimeout,
                 flinkShutdownClusterTimeout,
                 artifactsBaseDir,
-                savepointHistoryMaxCount,
-                savepointHistoryMaxAge);
+                savepointHistoryCountThreshold,
+                savepointHistoryAgeThreshold);
     }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index b8e03b2..f4eda21 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -151,6 +151,13 @@ public class KubernetesOperatorConfigOptions {
                     .defaultValue(10)
                     .withDescription("Maximum number of savepoint history entries to retain.");
 
+    public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD =
+            ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximum number threshold of savepoint history entries to retain.");
+
     public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE =
             ConfigOptions.key("kubernetes.operator.savepoint.history.max.age")
                     .durationType()
@@ -158,6 +165,13 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");
 
+    public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
+            ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximum age threshold for savepoint history entries to retain.");
+
     public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
             ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
                     .mapType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index dc129c7..a2a624a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
@@ -28,6 +29,7 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusHelper;
@@ -148,13 +150,23 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
 
         // maintain history
         List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
-        int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+        int maxCount =
+                ConfigOptionUtils.getValueWithThreshold(
+                        deployedConfig,
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
+                        configManager
+                                .getOperatorConfiguration()
+                                .getSavepointHistoryCountThreshold());
         while (savepointHistory.size() > maxCount) {
             // remove oldest entries
             disposeSavepointQuietly(savepointHistory.remove(0), deployedConfig);
         }
 
-        Duration maxAge = configManager.getOperatorConfiguration().getSavepointHistoryMaxAge();
+        Duration maxAge =
+                ConfigOptionUtils.getValueWithThreshold(
+                        deployedConfig,
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                        configManager.getOperatorConfiguration().getSavepointHistoryAgeThreshold());
         long maxTms = System.currentTimeMillis() - maxAge.toMillis();
         Iterator<Savepoint> it = savepointHistory.iterator();
         while (it.hasNext()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java
new file mode 100644
index 0000000..4d9321b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link ConfigOption} utilities. */
+public class ConfigOptionUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigOptionUtils.class);
+
+    /**
+     * Gets the value of {@link ConfigOption} with threshold.
+     *
+     * @param config The effective config
+     * @param configOption The config option.
+     * @param configThreshold The config threshold.
+     * @return The value of {@link ConfigOption} with threshold.
+     */
+    public static <T extends Comparable<T>> T getValueWithThreshold(
+            Configuration config, ConfigOption<T> configOption, T configThreshold) {
+        T configValue = config.get(configOption);
+        if (configThreshold != null && configValue.compareTo(configThreshold) > 0) {
+            LOG.warn(
+                    "Uses the config threshold [{}] instead of the config value [{}] of '{}'.",
+                    configThreshold,
+                    configValue,
+                    configOption.key());
+            return configThreshold;
+        }
+        return configValue;
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index ef335b5..44aa3bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -88,6 +88,40 @@ public class SavepointObserverTest {
                 Collections.singletonList(sp1.getLocation()), flinkService.getDisposedSavepoints());
     }
 
+    @Test
+    public void testAgeBasedDisposeWithAgeThreshold() {
+        Configuration conf = new Configuration();
+        conf.set(
+                KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                Duration.ofMillis(System.currentTimeMillis() * 2));
+        conf.set(
+                KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
+                Duration.ofMillis(5));
+        configManager.updateDefaultConfig(conf);
+
+        SavepointObserver observer =
+                new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
+        SavepointInfo spInfo = new SavepointInfo();
+
+        Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
+        spInfo.updateLastSavepoint(sp1);
+        observer.cleanupSavepointHistory(spInfo, sp1, conf);
+        Assertions.assertIterableEquals(
+                Collections.singletonList(sp1), spInfo.getSavepointHistory());
+        Assertions.assertIterableEquals(
+                Collections.emptyList(), flinkService.getDisposedSavepoints());
+
+        Savepoint sp2 = new Savepoint(2, "sp2", SavepointTriggerType.MANUAL);
+        spInfo.updateLastSavepoint(sp2);
+        observer.cleanupSavepointHistory(spInfo, sp2, conf);
+        Assertions.assertIterableEquals(
+                Collections.singletonList(sp2), spInfo.getSavepointHistory());
+        Assertions.assertIterableEquals(
+                Collections.singletonList(sp1.getLocation()), flinkService.getDisposedSavepoints());
+
+        configManager.updateDefaultConfig(new Configuration());
+    }
+
     @Test
     public void testPeriodicSavepoint() throws Exception {
         var conf = new Configuration();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java
new file mode 100644
index 0000000..8461111
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test class for {@link ConfigOptionUtils}. */
+public class ConfigOptionUtilsTest {
+
+    @Test
+    public void testValueWithThreshold() {
+        Configuration config = new Configuration();
+        config.set(
+                KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                Duration.ofMillis(5));
+        assertEquals(
+                ConfigOptionUtils.getValueWithThreshold(
+                        config,
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                        Duration.ofMillis(10)),
+                Duration.ofMillis(5));
+
+        assertEquals(
+                ConfigOptionUtils.getValueWithThreshold(
+                        config,
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+                        Duration.ofMillis(4)),
+                Duration.ofMillis(4));
+    }
+}