You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/09 06:19:40 UTC

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

gyfora commented on code in PR #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893110295


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -145,12 +145,25 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Whether to enable recovery of missing/deleted jobmanager deployments.");
 
+    public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD =
+            ConfigOptions.key("kubernetes.operator.savepoint.history.count.threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number threshold of savepoint history entries to retain.");
+
     public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
             ConfigOptions.key("kubernetes.operator.savepoint.history.max.count")
                     .intType()
                     .defaultValue(10)
                     .withDescription("Maximum number of savepoint history entries to retain.");
 
+    public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_AGE_THRESHOLD =
+            ConfigOptions.key("kubernetes.operator.savepoint.history.age.threshold")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Age threshold for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");
+

Review Comment:
   I think the new config keys should be:
   `OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold"`
   `OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold"`
   
   To make this straightforward to users and admins



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -148,13 +149,38 @@ void cleanupSavepointHistory(
 
         // maintain history
         List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
-        int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+        Integer countThreshold =
+                configManager.getOperatorConfiguration().getSavepointHistoryCountThreshold();
+        int maxCount =
+                deployedConfig.get(
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
+        if (countThreshold != null && maxCount > countThreshold) {
+            LOG.warn(
+                    "Maximum number of savepoint history entries to retain changes to {}. The value of '{}' exceeds the value of '{}'",
+                    countThreshold,
+                    KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(),
+                    KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD
+                            .key());
+            maxCount = countThreshold;
+        }

Review Comment:
   Can we refactor this pattern into a utility method? 
   ```
   public static <T extends Comparable<T>> T getValueWithThreshold(ConfigOption<T> option, ConfigOption<T> threshold, Configuraiton config) {
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org