You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/24 14:18:26 UTC

[GitHub] [flink-kubernetes-operator] Aitozi commented on a diff in pull request #177: [FLINK-27303][FLINK-27309] Introduce FlinkConfigManager for efficient config management

Aitozi commented on code in PR #177:
URL: https://github.com/apache/flink-kubernetes-operator/pull/177#discussion_r857132299


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -55,31 +54,45 @@
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
-    private final ObjectMeta meta;
+    private final String namespace;
+    private final String clusterId;
     private final FlinkDeploymentSpec spec;
     private final Configuration effectiveConfig;
-
-    public static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
-
-    public FlinkConfigBuilder(FlinkDeployment deploy, Configuration flinkConfig) {
-        this(deploy.getMetadata(), deploy.getSpec(), flinkConfig);
+    private final boolean forDeployment;
+
+    protected static final Duration DEFAULT_CHECKPOINTING_INTERVAL = Duration.ofMinutes(5);
+
+    protected FlinkConfigBuilder(
+            FlinkDeployment deployment, Configuration flinkConfig, boolean forDeployment) {
+        this(
+                deployment.getMetadata().getNamespace(),
+                deployment.getMetadata().getName(),
+                deployment.getSpec(),
+                flinkConfig,
+                forDeployment);
     }
 
-    public FlinkConfigBuilder(
-            ObjectMeta metadata, FlinkDeploymentSpec spec, Configuration flinkConfig) {
-        this.meta = metadata;
+    protected FlinkConfigBuilder(
+            String namespace,
+            String clusterId,
+            FlinkDeploymentSpec spec,
+            Configuration flinkConfig,
+            boolean forDeployment) {
+        this.namespace = namespace;
+        this.clusterId = clusterId;
         this.spec = spec;
         this.effectiveConfig = new Configuration(flinkConfig);
+        this.forDeployment = forDeployment;

Review Comment:
   Do we have to add comments for `forDeployment`. It seems mainly to save the create temp template file Right ?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.config;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
+
+/** Configuration manager for the Flink operator. */
+public class FlinkConfigManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final String OP_CM_NAME = "flink-operator-config";
+
+    private static final int CACHE_SIZE = 1000;
+
+    private volatile Configuration defaultConfig;
+    private volatile FlinkOperatorConfiguration operatorConfiguration;
+
+    private final Cache<Tuple4<String, String, ObjectNode, Boolean>, Configuration> cache;
+    private Set<String> namespaces = OperatorUtils.getWatchedNamespaces();
+    private ConfigUpdater configUpdater;
+
+    public FlinkConfigManager(KubernetesClient client) {
+        this(readConfiguration(client));
+
+        if (defaultConfig.getBoolean(OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
+            scheduleConfigWatcher(client);
+        }
+    }
+
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this.cache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
+        updateDefaultConfig(
+                defaultConfig,
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces));
+    }
+
+    public Configuration getDefaultConfig() {
+        return defaultConfig;
+    }
+
+    @VisibleForTesting
+    protected void updateDefaultConfig(
+            Configuration defaultConfig, FlinkOperatorConfiguration operatorConfiguration) {
+        this.defaultConfig = defaultConfig;
+        cache.invalidateAll();
+        this.operatorConfiguration = operatorConfiguration;
+    }
+
+    public FlinkOperatorConfiguration getOperatorConfiguration() {
+        return operatorConfiguration;
+    }
+
+    public Configuration getObserveConfig(FlinkDeployment deployment) {
+        return getConfig(
+                deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment), false);
+    }
+
+    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec spec) {
+        return getConfig(objectMeta, spec, true);
+    }
+
+    @SneakyThrows
+    private Configuration getConfig(
+            ObjectMeta objectMeta, FlinkDeploymentSpec spec, boolean forDeploy) {
+
+        var key =
+                Tuple4.of(
+                        objectMeta.getNamespace(),
+                        objectMeta.getName(),
+                        objectMapper.convertValue(spec, ObjectNode.class),
+                        forDeploy);
+
+        var conf = cache.getIfPresent(key);
+        if (conf == null) {
+            conf = generateConfig(key);
+            cache.put(key, conf);
+        }
+
+        // Always return a copy of the configuration to avoid polluting the cache
+        return conf.clone();
+    }
+
+    private Configuration generateConfig(Tuple4<String, String, ObjectNode, Boolean> key) {
+        try {
+            LOG.info("Generating new {} config", key.f3 ? "deployment" : "observe");
+            return FlinkConfigBuilder.buildFrom(
+                    key.f0,
+                    key.f1,
+                    objectMapper.convertValue(key.f2, FlinkDeploymentSpec.class),
+                    defaultConfig,
+                    key.f3);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load configuration", e);
+        }
+    }
+
+    private void scheduleConfigWatcher(KubernetesClient client) {
+        var checkInterval = defaultConfig.get(OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
+        var millis = checkInterval.toMillis();
+        configUpdater = new ConfigUpdater(client);
+        Executors.newSingleThreadScheduledExecutor()
+                .scheduleAtFixedRate(configUpdater, millis, millis, TimeUnit.MILLISECONDS);
+        LOG.info("Enabled dynamic config updates, checking config changes every {}", checkInterval);
+    }
+
+    @VisibleForTesting
+    public void setWatchedNamespaces(Set<String> namespaces) {
+        this.namespaces = namespaces;
+        operatorConfiguration =
+                FlinkOperatorConfiguration.fromConfiguration(defaultConfig, namespaces);
+    }
+
+    @VisibleForTesting
+    public Runnable getConfigUpdater() {
+        return configUpdater;
+    }
+
+    private static Configuration readConfiguration(KubernetesClient client) {
+        var operatorNamespace = EnvUtils.getOrDefault(EnvUtils.ENV_OPERATOR_NAMESPACE, "default");
+        var configMap =
+                client.configMaps().inNamespace(operatorNamespace).withName(OP_CM_NAME).get();
+        Map<String, String> data = configMap.getData();
+        Path tmpDir = null;
+        try {
+            tmpDir = Files.createTempDirectory("flink-conf");
+            Files.write(
+                    tmpDir.resolve(GlobalConfiguration.FLINK_CONF_FILENAME),
+                    data.get(GlobalConfiguration.FLINK_CONF_FILENAME)
+                            .getBytes(Charset.defaultCharset()));
+            return GlobalConfiguration.loadConfiguration(tmpDir.toAbsolutePath().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException("Could not load default config", ioe);
+        } finally {
+            if (tmpDir != null) {
+                FileUtils.deleteDirectoryQuietly(tmpDir.toFile());
+            }
+        }
+    }
+
+    class ConfigUpdater implements Runnable {
+        KubernetesClient kubeClient;
+
+        public ConfigUpdater(KubernetesClient kubeClient) {
+            this.kubeClient = kubeClient;
+        }
+
+        public void run() {
+            try {
+                LOG.debug("Checking for config update changes...");
+                var newConf = readConfiguration(kubeClient);

Review Comment:
   Why not read the config directly from the mounted dir. I think we do not have to interact with the configMap directly. Same at the the `FlinkConfigManager` construct



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java:
##########
@@ -177,19 +192,21 @@ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         return this;
     }
 
-    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+    protected FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
         if (spec.getTaskManager() != null) {
             setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
-            setPodTemplate(
-                    spec.getPodTemplate(),
-                    spec.getTaskManager().getPodTemplate(),
-                    effectiveConfig,
-                    false);
+            if (forDeployment) {
+                setPodTemplate(

Review Comment:
   The temp file may still keep for a long time during the operator running, because it clean up relies on the `deleteOnExit`. Can we expire the temp files when the cache are expired ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org