You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/09 01:11:22 UTC

[GitHub] [flink-kubernetes-operator] SteNicholas opened a new pull request, #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

SteNicholas opened a new pull request, #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264

   Currently `savepointHistoryMaxCount` and `savepointHistoryMaxAge` is part of the `FlinkOperatorConfiguration` class which means that users cannot override this from their user deployment. `savepointHistoryMaxCount` and `savepointHistoryMaxAge` should be removed and get directly from the effective config. A max allowed value for these configurations that is configured on the FlinkOperatorConfiguration level so that users cannot infinitely grow the status size and cause problems for the k8s api should be introduced.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893206802


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link ConfigOption} utilities. */
+public class ConfigOptionUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigOptionUtils.class);
+
+    /**
+     * Gets the value of {@link ConfigOption} with threshold.
+     *
+     * @param configOption The config option.
+     * @param thresholdOption The threshold option.
+     * @param effectiveConfig The effective config.
+     * @param operatorConfig The operator config.
+     * @return The value of {@link ConfigOption} with threshold.
+     */
+    public static <T extends Comparable<T>> T getValueWithThreshold(
+            ConfigOption<T> configOption,
+            ConfigOption<T> thresholdOption,
+            Configuration effectiveConfig,

Review Comment:
   you could simply call the variable conf/config this is just a utility method



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link ConfigOption} utilities. */
+public class ConfigOptionUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigOptionUtils.class);
+
+    /**
+     * Gets the value of {@link ConfigOption} with threshold.
+     *
+     * @param configOption The config option.
+     * @param thresholdOption The threshold option.
+     * @param effectiveConfig The effective config.
+     * @param operatorConfig The operator config.
+     * @return The value of {@link ConfigOption} with threshold.
+     */
+    public static <T extends Comparable<T>> T getValueWithThreshold(
+            ConfigOption<T> configOption,
+            ConfigOption<T> thresholdOption,
+            Configuration effectiveConfig,

Review Comment:
   you could simply call the parameter conf/config this is just a utility method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893195704


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java:
##########
@@ -40,8 +40,7 @@ public class FlinkOperatorConfiguration {
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
-    int savepointHistoryMaxCount;
-    Duration savepointHistoryMaxAge;
+    Configuration operatorConfig;

Review Comment:
   I would prefer to keep the concrete thershold values as fields here to keep this consistent



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ConfigOptionUtils.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** {@link ConfigOption} utilities. */
+public class ConfigOptionUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConfigOptionUtils.class);
+
+    /**
+     * Gets the value of {@link ConfigOption} with threshold.
+     *
+     * @param configOption The config option.
+     * @param thresholdOption The threshold option.
+     * @param effectiveConfig The effective config.
+     * @param operatorConfig The operator config.
+     * @return The value of {@link ConfigOption} with threshold.
+     */
+    public static <T extends Comparable<T>> T getValueWithThreshold(
+            ConfigOption<T> configOption,
+            ConfigOption<T> thresholdOption,
+            Configuration effectiveConfig,
+            Configuration operatorConfig) {

Review Comment:
   instead of passing the `thresholdOption` + `operatorConfig` we could simply pass the threshold value directly that we get from the operatorconfig to make this simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264#discussion_r893110295


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -145,12 +145,25 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Whether to enable recovery of missing/deleted jobmanager deployments.");
 
+    public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD =
+            ConfigOptions.key("kubernetes.operator.savepoint.history.count.threshold")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription("Number threshold of savepoint history entries to retain.");
+
     public static final ConfigOption<Integer> OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT =
             ConfigOptions.key("kubernetes.operator.savepoint.history.max.count")
                     .intType()
                     .defaultValue(10)
                     .withDescription("Maximum number of savepoint history entries to retain.");
 
+    public static final ConfigOption<Duration> OPERATOR_SAVEPOINT_HISTORY_AGE_THRESHOLD =
+            ConfigOptions.key("kubernetes.operator.savepoint.history.age.threshold")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Age threshold for savepoint history entries to retain. Due to lazy clean-up, the most recent savepoint may live longer than the max age.");
+

Review Comment:
   I think the new config keys should be:
   `OPERATOR_SAVEPOINT_HISTORY_MAX_AGE.key() + ".threshold"`
   `OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key() + ".threshold"`
   
   To make this straightforward to users and admins



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java:
##########
@@ -148,13 +149,38 @@ void cleanupSavepointHistory(
 
         // maintain history
         List<Savepoint> savepointHistory = currentSavepointInfo.getSavepointHistory();
-        int maxCount = configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
+        Integer countThreshold =
+                configManager.getOperatorConfiguration().getSavepointHistoryCountThreshold();
+        int maxCount =
+                deployedConfig.get(
+                        KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT);
+        if (countThreshold != null && maxCount > countThreshold) {
+            LOG.warn(
+                    "Maximum number of savepoint history entries to retain changes to {}. The value of '{}' exceeds the value of '{}'",
+                    countThreshold,
+                    KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(),
+                    KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_COUNT_THRESHOLD
+                            .key());
+            maxCount = countThreshold;
+        }

Review Comment:
   Can we refactor this pattern into a utility method? 
   ```
   public static <T extends Comparable<T>> T getValueWithThreshold(ConfigOption<T> option, ConfigOption<T> threshold, Configuraiton config) {
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #264: [FLINK-27913] Remove savepointHistoryMaxCount and savepointHistoryMaxAge from FlinkOperatorConfiguration

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #264:
URL: https://github.com/apache/flink-kubernetes-operator/pull/264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org