You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/10 06:46:33 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 9d1be9c [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration
9d1be9c is described below
commit 9d1be9cadd2b292b2a9ea37852f1b3cda65d1422
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Fri Jun 10 14:46:29 2022 +0800
[FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration
---
.../kubernetes_operator_config_configuration.html | 12 +++++
.../config/FlinkOperatorConfiguration.java | 18 ++++----
.../config/KubernetesOperatorConfigOptions.java | 14 ++++++
.../operator/observer/SavepointObserver.java | 16 ++++++-
.../operator/utils/ConfigOptionUtils.java | 52 +++++++++++++++++++++
.../operator/observer/SavepointObserverTest.java | 34 ++++++++++++++
.../operator/utils/ConfigOptionUtilsTest.java | 53 ++++++++++++++++++++++
7 files changed, 189 insertions(+), 10 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 08c72d0..71cf1ac 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -110,12 +110,24 @@
<td>Duration</td>
<td>Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.savepoint.history.max.age.threshold</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>Maximum age threshold for savepoint history entries to retain.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.savepoint.history.max.count</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>Maximum number of savepoint history entries to retain.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.savepoint.history.max.count.threshold</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Maximum number threshold of savepoint history entries to retain.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.savepoint.trigger.grace-period</h5></td>
<td style="word-wrap: break-word;">1 min</td>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 36b9383..29e1264 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -40,8 +40,8 @@ public class FlinkOperatorConfiguration {
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
- int savepointHistoryMaxCount;
- Duration savepointHistoryMaxAge;
+ Integer savepointHistoryCountThreshold;
+ Duration savepointHistoryAgeThreshold;
public static FlinkOperatorConfiguration fromConfiguration(
Configuration operatorConfig, Set<String> watchedNamespaces) {
@@ -79,12 +79,14 @@ public class FlinkOperatorConfiguration {
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_USER_ARTIFACTS_BASE_DIR);
- int savepointHistoryMaxCount =
+ Integer savepointHistoryCountThreshold =
operatorConfig.get(
- KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
- Duration savepointHistoryMaxAge =
+ KubernetesOperatorConfigOptions
+ .OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD);
+ Duration savepointHistoryAgeThreshold =
operatorConfig.get(
- KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE);
+ KubernetesOperatorConfigOptions
+ .OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD);
String flinkServiceHostOverride = null;
if (EnvUtils.get("KUBERNETES_SERVICE_HOST") == null) {
@@ -103,7 +105,7 @@ public class FlinkOperatorConfiguration {
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
- savepointHistoryMaxCount,
- savepointHistoryMaxAge);
+ savepointHistoryCountThreshold,
+ savepointHistoryAgeThreshold);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index b8e03b2..f4eda21 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -151,6 +151,13 @@ public class KubernetesOperatorConfigOptions {
.defaultValue(10)
.withDescription("Maximum number of savepoint history entries to retain.");
+ public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT_THRESHOLD =
+ ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum number threshold of savepoint history entries to retain.");
+
public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE =
ConfigOptions.key("kubernetes.operator.savepoint.history.max.age")
.durationType()
@@ -158,6 +165,13 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Maximum age for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");
+ public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD =
+ ConfigOptions.key(OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum age threshold for savepoint history entries to retain.");
+
public static final ConfigOption<Map<String, String>> JAR_ARTIFACT_HTTP_HEADER =
ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
.mapType()
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index dc129c7..a2a624a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
@@ -28,6 +29,7 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
@@ -148,13 +150,23 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
// maintain history
List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
- int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+ int maxCount =
+ ConfigOptionUtils.getValueWithThreshold(
+ deployedConfig,
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT,
+ configManager
+ .getOperatorConfiguration()
+ .getSavepointHistoryCountThreshold());
while (savepointHistory.size() > maxCount) {
// remove oldest entries
disposeSavepointQuietly(savepointHistory.remove(0), deployedConfig);
}
- Duration maxAge = configManager.getOperatorConfiguration().getSavepointHistoryMaxAge();
+ Duration maxAge =
+ ConfigOptionUtils.getValueWithThreshold(
+ deployedConfig,
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ configManager.getOperatorConfiguration().getSavepointHistoryAgeThreshold());
long maxTms = System.currentTimeMillis() - maxAge.toMillis();
Iterator<Savepoint> it = savepointHistory.iterator();
while (it.hasNext()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java
new file mode 100644
index 0000000..4d9321b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link ConfigOption} utilities. */
+public class ConfigOptionUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigOptionUtils.class);
+
+ /**
+ * Gets the value of {@link ConfigOption} with threshold.
+ *
+ * @param config The effective config
+ * @param configOption The config option.
+ * @param configThreshold The config threshold.
+ * @return The value of {@link ConfigOption} with threshold.
+ */
+ public static <T extends Comparable<T>> T getValueWithThreshold(
+ Configuration config, ConfigOption<T> configOption, T configThreshold) {
+ T configValue = config.get(configOption);
+ if (configThreshold != null && configValue.compareTo(configThreshold) > 0) {
+ LOG.warn(
+ "Uses the config threshold [{}] instead of the config value [{}] of '{}'.",
+ configThreshold,
+ configValue,
+ configOption.key());
+ return configThreshold;
+ }
+ return configValue;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index ef335b5..44aa3bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -88,6 +88,40 @@ public class SavepointObserverTest {
Collections.singletonList(sp1.getLocation()), flinkService.getDisposedSavepoints());
}
+ @Test
+ public void testAgeBasedDisposeWithAgeThreshold() {
+ Configuration conf = new Configuration();
+ conf.set(
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(System.currentTimeMillis() * 2));
+ conf.set(
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
+ Duration.ofMillis(5));
+ configManager.updateDefaultConfig(conf);
+
+ SavepointObserver observer =
+ new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
+ SavepointInfo spInfo = new SavepointInfo();
+
+ Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
+ spInfo.updateLastSavepoint(sp1);
+ observer.cleanupSavepointHistory(spInfo, sp1, conf);
+ Assertions.assertIterableEquals(
+ Collections.singletonList(sp1), spInfo.getSavepointHistory());
+ Assertions.assertIterableEquals(
+ Collections.emptyList(), flinkService.getDisposedSavepoints());
+
+ Savepoint sp2 = new Savepoint(2, "sp2", SavepointTriggerType.MANUAL);
+ spInfo.updateLastSavepoint(sp2);
+ observer.cleanupSavepointHistory(spInfo, sp2, conf);
+ Assertions.assertIterableEquals(
+ Collections.singletonList(sp2), spInfo.getSavepointHistory());
+ Assertions.assertIterableEquals(
+ Collections.singletonList(sp1.getLocation()), flinkService.getDisposedSavepoints());
+
+ configManager.updateDefaultConfig(new Configuration());
+ }
+
@Test
public void testPeriodicSavepoint() throws Exception {
var conf = new Configuration();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java
new file mode 100644
index 0000000..8461111
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtilsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test class for {@link ConfigOptionUtils}. */
+public class ConfigOptionUtilsTest {
+
+ @Test
+ public void testValueWithThreshold() {
+ Configuration config = new Configuration();
+ config.set(
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(5));
+ assertEquals(
+ ConfigOptionUtils.getValueWithThreshold(
+ config,
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(10)),
+ Duration.ofMillis(5));
+
+ assertEquals(
+ ConfigOptionUtils.getValueWithThreshold(
+ config,
+ KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
+ Duration.ofMillis(4)),
+ Duration.ofMillis(4));
+ }
+}