You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/21 16:41:19 UTC

[GitHub] [flink-kubernetes-operator] gyfora opened a new pull request, #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

gyfora opened a new pull request, #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177

   This PR improves/reworks the configuration management of the oprator. Problems with the current approach:
     - Configuration and FlinkOperatorConfigurations are passed around everywhere
     - Every component generates effective configurations on their own (without consistency which can cause problems)
     - Configs are generated again and again creating a large number of temporary files in every reconcile iteration
     - Configuration is completely static and impossible to change without operator restart
     
   To try to tackle all these problems at once I have introduced the `FlinkConfigManager` object which is responsible for:
    - Generating configuration for deployment / cluster interactions (observe)
    - The config generation logic is consistently aware of currently deployed spec (this is important due to rollback logic)
    - Caching the generated configs for a given specification to speed things up and avoid extra tmp files
    - Allow modifying default configurations on the fly by watching ConfigMap changes
    
   While these changes touch almost all components, it is mostly cosmetic. The core improvements are located in the `FlinkConfigManager` class and which config is requested in the different parts of the reconcile logic.
   
   **NOTE: Tests for the `FlinkConfigManager` are still todo before merging**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857316377


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   I have added cleanup for the temp files. Also with this I removed the `forDeployment` logic because it is not really necessary anymore (temp files will be cleaned up anyways.)
   
   Set the expiration after access to 1 hour to avoid any problems with cleanup during deployment.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -55,31 +54,45 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final ObjectMeta meta;
+    private final String namespace;
+    private final String clusterId;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
-
-    public static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
-
-    public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
+    private final boolean forDeployment;
+
+    protected static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
+
+    protected FlinkConfigBuilder(
+            FlinkDeployment deployment, Configuration flinkConfig, boolean forDeployment) {
+        this(
+                deployment.getMetadata().getNamespace(),
+                deployment.getMetadata().getName(),
+                deployment.getSpec(),
+                flinkConfig,
+                forDeployment);
     }
 
-    public FlinkConfigBuilder(
-            ObjectMeta metadata, FlinkDeploymentSpec spec, Configuration flinkConfig) {
-        this.meta = metadata;
+    protected FlinkConfigBuilder(
+            String namespace,
+            String clusterId,
+            FlinkDeploymentSpec spec,
+            Configuration flinkConfig,
+            boolean forDeployment) {
+        this.namespace = namespace;
+        this.clusterId = clusterId;
         this.spec = spec;
         this.effectiveConfig = new Configuration(flinkConfig);
+        this.forDeployment = forDeployment;

Review Comment:
   I removed this already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857840866


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java:
##########
@@ -119,38 +116,39 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E
                 if (desiredJobState == JobState.RUNNING) {
                     LOG.info("Upgrading/Restarting running job, suspending first...");
                 }
-                stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, effectiveConfig);
+                stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, deployedConfig);
             }
             if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
                 if (upgradeMode == UpgradeMode.STATELESS) {
-                    submitAndInitStatus(flinkSessionJob, effectiveConfig, null);
+                    submitAndInitStatus(flinkSessionJob, deployedConfig, null);

Review Comment:
   I think this is correct, we need to get the config that is running on the cluster at the moment. Which is the observeConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857133548


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   Why not read the config directly from the mounted dir. I think we do not have to interact with the configMap directly. Same at the the `FlinkConfigManager` constructor. By this, the `FlinkConfigManager` also do not have to have a `KubernetesClient`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857766928


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;

Review Comment:
   I can live with it, but `flinkConfiguration` and `operatorConfiguration` clearly says what they are meant to be used for



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857835554


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.java:
##########
@@ -119,38 +116,39 @@ public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws E
                 if (desiredJobState == JobState.RUNNING) {
                     LOG.info("Upgrading/Restarting running job, suspending first...");
                 }
-                stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, effectiveConfig);
+                stateAfterReconcile = suspendJob(flinkSessionJob, upgradeMode, deployedConfig);
             }
             if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
                 if (upgradeMode == UpgradeMode.STATELESS) {
-                    submitAndInitStatus(flinkSessionJob, effectiveConfig, null);
+                    submitAndInitStatus(flinkSessionJob, deployedConfig, null);

Review Comment:
   I think here we should use `getDeployConfig` to submit a new job instead of using `getObserveConfig`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857317507


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   But you are right this could still be a problem if the clean up is triggered based on the configmap update, I will think about it a little



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857606754


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+    private final AtomicLong defaultConfigVersion = new AtomicLong(0);
+
+    private final LoadingCache<Tuple4<Long, String, String, ObjectNode>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+
+    public FlinkConfigManager() {
+        this(GlobalConfiguration.loadConfiguration());
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(MAX_CACHE_SIZE)
+                        .expireAfterAccess(CACHE_TIMEOUT)
+                        .removalListener(
+                                removalNotification ->
+                                        FlinkConfigBuilder.cleanupTmpFiles(
+                                                (Configuration) removalNotification.getValue()))
+                        .build(
+                                new CacheLoader<>() {
+                                    @Override
+                                    public Configuration load(
+                                            Tuple4<Long, String, String, ObjectNode> k) {
+                                        return generateConfig(k);
+                                    }
+                                });
+
+        updateDefaultConfig(defaultConfig);
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher();
+        }
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    public void updateDefaultConfig(Configuration newConf) {
+        if (newConf.equals(defaultConfig)) {
+            LOG.info("Default configuration did not change, nothing to do...");
+            return;
+        }
+
+        LOG.info("Updating default configuration");
+        this.operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces);
+        this.defaultConfig = newConf.clone();
+        // We do not invalidate the cache to avoid deleting currently used temp files, simply bump
+        // the version
+        this.defaultConfigVersion.incrementAndGet();
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec);
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        var key =
+                Tuple4.of(
+                        defaultConfigVersion.get(),
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class));
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return cache.get(key).clone();
+    }
+
+    private Configuration generateConfig(Tuple4<Long, String, String, ObjectNode> key) {
+        try {
+            LOG.info("Generating new config");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f1,
+                    key.f2,
+                    objectMapper.convertValue(key.f3, FlinkDeploymentSpec.class),
+                    defaultConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher() {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);

Review Comment:
   I use that object in the logging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora merged pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora merged PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857214801


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   I think over it again, It may bring the sync problem if the config is just used to deploy and the async replace is scheduled (if we cleanup the temp file when invalid cache or at cache expire). Maybe the cache lifecycle can be align to the job lifecycle, which means we can cleanup the cache and config in the cleanup method ?  WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] wangyang0918 commented on pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#issuecomment-1106075709

   Just a quick note: instead of creating a watch on the ConfigMap, I prefer to simply use `ScheduledExecutorService#scheduleAtFixedRate` to detect default configuration file change on the fly. It is similar to `monitorInterval` in log4j. Because naked watch is not stable and could be closed in many cases, we need to use informer instead. But the informer might be an over-kill.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857135960


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   nice, that will make this much simpler I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857134764


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   I will try to add that, good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#issuecomment-1106114156

   > Just a quick note: instead of creating a watch on the ConfigMap, I prefer to simply use `ScheduledExecutorService#scheduleAtFixedRate` to detect default configuration file change on the fly. It is similar to `monitorInterval` in log4j. Because naked watch is not stable and could be closed in many cases, we need to use informer instead. But the informer might be an over-kill.
   
   Makes sense @wangyang0918 thats a simple change to make it more robust 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857758034


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;

Review Comment:
   I think baseConfiguration would also be appopriate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857472264


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+    private final AtomicLong defaultConfigVersion = new AtomicLong(0);
+
+    private final LoadingCache<Tuple4<Long, String, String, ObjectNode>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+
+    public FlinkConfigManager() {
+        this(GlobalConfiguration.loadConfiguration());
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(MAX_CACHE_SIZE)
+                        .expireAfterAccess(CACHE_TIMEOUT)
+                        .removalListener(
+                                removalNotification ->
+                                        FlinkConfigBuilder.cleanupTmpFiles(
+                                                (Configuration) removalNotification.getValue()))
+                        .build(
+                                new CacheLoader<>() {
+                                    @Override
+                                    public Configuration load(
+                                            Tuple4<Long, String, String, ObjectNode> k) {
+                                        return generateConfig(k);
+                                    }
+                                });
+
+        updateDefaultConfig(defaultConfig);
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher();
+        }
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    public void updateDefaultConfig(Configuration newConf) {
+        if (newConf.equals(defaultConfig)) {
+            LOG.info("Default configuration did not change, nothing to do...");
+            return;
+        }
+
+        LOG.info("Updating default configuration");
+        this.operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces);
+        this.defaultConfig = newConf.clone();
+        // We do not invalidate the cache to avoid deleting currently used temp files, simply bump
+        // the version
+        this.defaultConfigVersion.incrementAndGet();
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec);
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment));
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        var key =
+                Tuple4.of(
+                        defaultConfigVersion.get(),
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class));
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return cache.get(key).clone();
+    }
+
+    private Configuration generateConfig(Tuple4<Long, String, String, ObjectNode> key) {
+        try {
+            LOG.info("Generating new config");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f1,
+                    key.f2,
+                    objectMapper.convertValue(key.f3, FlinkDeploymentSpec.class),
+                    defaultConfig);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher() {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);

Review Comment:
   nit: we can directly use `defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL).toMillis();`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+    private final AtomicLong defaultConfigVersion = new AtomicLong(0);
+
+    private final LoadingCache<Tuple4<Long, String, String, ObjectNode>, Configuration> cache;

Review Comment:
   Can we replace the key to a simple inner class instead of `Tuple4`, it may make the meaning more clear



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
morhidi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857730608


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;

Review Comment:
   Why don't we call it `flinkConfig`? `defaultConfig` is a bit overloaded here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857135596


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   As I know, it is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857214801


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   I think over it again, It may bring the sync problem if the config is just used to deploy and the async replace is scheduled. Maybe the cache lifecycle can be align to the job lifecycle, which means we can cleanup the cache and config in the cleanup method ?  WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#issuecomment-1105455646

   cc @Aitozi @bgeng777 @morhidi @tweise @wangyang0918 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857132299


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -55,31 +54,45 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final ObjectMeta meta;
+    private final String namespace;
+    private final String clusterId;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
-
-    public static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
-
-    public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
+    private final boolean forDeployment;
+
+    protected static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
+
+    protected FlinkConfigBuilder(
+            FlinkDeployment deployment, Configuration flinkConfig, boolean forDeployment) {
+        this(
+                deployment.getMetadata().getNamespace(),
+                deployment.getMetadata().getName(),
+                deployment.getSpec(),
+                flinkConfig,
+                forDeployment);
     }
 
-    public FlinkConfigBuilder(
-            ObjectMeta metadata, FlinkDeploymentSpec spec, Configuration flinkConfig) {
-        this.meta = metadata;
+    protected FlinkConfigBuilder(
+            String namespace,
+            String clusterId,
+            FlinkDeploymentSpec spec,
+            Configuration flinkConfig,
+            boolean forDeployment) {
+        this.namespace = namespace;
+        this.clusterId = clusterId;
         this.spec = spec;
         this.effectiveConfig = new Configuration(flinkConfig);
+        this.forDeployment = forDeployment;

Review Comment:
   Do we have to add comments for `forDeployment`. It seems mainly to save the create temp template file Right ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   Why not read the config directly from the mounted dir. I think we do not have to interact with the configMap directly. Same at the the `FlinkConfigManager` construct



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   The temp file may still keep for a long time during the operator running, because it clean up relies on the `deleteOnExit`. Can we expire the temp files when the cache are expired ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857478174


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   It looks good now, +1 for this otherwise two other minor comments. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857172750


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   I fixed this and verified that it works correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857422227


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   @Aitozi I have pushed a fix for this, instead of invalidating the cache I bump a config version long field. This will avoid race conditions between default config updates and temp file cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857757055


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.Builder;
+import lombok.SneakyThrows;
+import lombok.Value;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;

Review Comment:
   it's the default config onto which the `FlinkDeployment` flinkConfiguration sections are applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] bgeng777 commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
bgeng777 commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857809206


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java:
##########
@@ -39,27 +39,22 @@ public abstract class AbstractDeploymentReconciler implements Reconciler<FlinkDe
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDeploymentReconciler.class);
 
-    protected final FlinkOperatorConfiguration operatorConfiguration;
+    protected final FlinkConfigManager configManager;
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
-    protected final Configuration defaultConfig;
 
     public AbstractDeploymentReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkOperatorConfiguration operatorConfiguration,
-            Configuration defaultConfig) {
-
+            FlinkConfigManager configManager) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
-        this.operatorConfiguration = operatorConfiguration;
-        this.defaultConfig = defaultConfig;
+        this.configManager = configManager;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig);
-        return shutdownAndDelete(flinkApp, effectiveConfig);
+        return shutdownAndDelete(flinkApp, configManager.getObserveConfig(flinkApp));

Review Comment:
   The older code builds config based on current `flinkApp` but IIUC, here `getObserveConfig`  will use `lastReconciledSpec` or `lastStableSpec`. Is this change on purpose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857825101


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java:
##########
@@ -39,27 +39,22 @@ public abstract class AbstractDeploymentReconciler implements Reconciler<FlinkDe
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractDeploymentReconciler.class);
 
-    protected final FlinkOperatorConfiguration operatorConfiguration;
+    protected final FlinkConfigManager configManager;
     protected final KubernetesClient kubernetesClient;
     protected final FlinkService flinkService;
-    protected final Configuration defaultConfig;
 
     public AbstractDeploymentReconciler(
             KubernetesClient kubernetesClient,
             FlinkService flinkService,
-            FlinkOperatorConfiguration operatorConfiguration,
-            Configuration defaultConfig) {
-
+            FlinkConfigManager configManager) {
         this.kubernetesClient = kubernetesClient;
         this.flinkService = flinkService;
-        this.operatorConfiguration = operatorConfiguration;
-        this.defaultConfig = defaultConfig;
+        this.configManager = configManager;
     }
 
     @Override
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
-        Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig);
-        return shutdownAndDelete(flinkApp, effectiveConfig);
+        return shutdownAndDelete(flinkApp, configManager.getObserveConfig(flinkApp));

Review Comment:
   I tried to consistently change it everywhere where we interact with the running job. Because that was technically deployed with either the lastReconciled or lastStable spec as you see it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857641025


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final int MAX_CACHE_SIZE = 1000;
+    private static final Duration CACHE_TIMEOUT = Duration.ofMinutes(30);
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+    private final AtomicLong defaultConfigVersion = new AtomicLong(0);
+
+    private final LoadingCache<Tuple4<Long, String, String, ObjectNode>, Configuration> cache;

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#issuecomment-1106522084

   Reworked the watcher logic and added some more tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Posted by GitBox <gi...@apache.org>.
gyfora commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857135110


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   Are the mounted files updated automatically if the configmap changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org