You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/19 08:35:45 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27812] Support Dynamic Change of Watched Namespaces

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 1177703  [FLINK-27812] Support Dynamic Change of Watched Namespaces
1177703 is described below

commit 11777032e3c3011c843631471c1b52aecb52dabf
Author: Matyas Orhidi <53...@users.noreply.github.com>
AuthorDate: Sun Jun 19 10:35:41 2022 +0200

    [FLINK-27812] Support Dynamic Change of Watched Namespaces
---
 .../kubernetes_operator_config_configuration.html  | 12 ++++
 .../flink/kubernetes/operator/FlinkOperator.java   | 80 +++++++++++----------
 .../operator/config/FlinkConfigManager.java        | 34 ++++++---
 .../config/FlinkOperatorConfiguration.java         | 22 +++++-
 .../config/KubernetesOperatorConfigOptions.java    | 16 +++++
 .../operator/informer/InformerManagerTest.java     | 46 ------------
 .../operator/utils/ReconciliationUtilsTest.java    |  5 +-
 flink-kubernetes-webhook/pom.xml                   |  8 +++
 .../operator/admission/FlinkOperatorWebhook.java   | 13 +++-
 .../operator/admission/FlinkValidator.java         | 12 +---
 .../admission}/informer/InformerManager.java       | 36 +++++++---
 .../operator/admission/AdmissionHandlerTest.java   |  5 +-
 .../admission/informer/InformerManagerTest.java    | 84 ++++++++++++++++++++++
 .../templates/flink-operator.yaml                  |  7 +-
 14 files changed, 256 insertions(+), 124 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 71cf1ac..1bfc9de 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -44,6 +44,12 @@
             <td>Boolean</td>
             <td>Whether to enable on-the-fly config changes through the operator configmap.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -146,5 +152,11 @@
             <td>Map</td>
             <td>Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.watched.namespaces</h5></td>
+            <td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
+            <td>String</td>
+            <td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 4770617..507ab62 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,17 +17,13 @@
 
 package org.apache.flink.kubernetes.operator;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
@@ -49,15 +45,18 @@ import org.apache.flink.metrics.MetricGroup;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
+import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.Executors;
-import java.util.function.Consumer;
 
 /** Main Class for Flink native k8s operator. */
 public class FlinkOperator {
@@ -69,16 +68,16 @@ public class FlinkOperator {
     private final FlinkService flinkService;
     private final FlinkConfigManager configManager;
     private final Set<FlinkResourceValidator> validators;
+    private final Set<RegisteredController> registeredControllers = new HashSet<>();
     private final MetricGroup metricGroup;
 
     public FlinkOperator(@Nullable Configuration conf) {
         this.client = new DefaultKubernetesClient();
-        this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
-        this.operator =
-                new Operator(
-                        client,
-                        getConfigurationServiceOverriderConsumer(
-                                configManager.getOperatorConfiguration()));
+        this.configManager =
+                conf != null
+                        ? new FlinkConfigManager(conf) // For testing only
+                        : new FlinkConfigManager(this::handleNamespaceChanges);
+        this.operator = new Operator(client, this::overrideOperatorConfigs);
         this.flinkService = new FlinkService(client, configManager);
         this.validators = ValidatorUtils.discoverValidators(configManager);
         this.metricGroup =
@@ -88,20 +87,25 @@ public class FlinkOperator {
         FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
     }
 
-    @VisibleForTesting
-    protected static Consumer<ConfigurationServiceOverrider>
-            getConfigurationServiceOverriderConsumer(
-                    FlinkOperatorConfiguration operatorConfiguration) {
-        return overrider -> {
-            int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
-            if (parallelism == -1) {
-                LOG.info("Configuring operator with unbounded reconciliation thread pool.");
-                overrider.withExecutorService(Executors.newCachedThreadPool());
-            } else {
-                LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
-                overrider.withConcurrentReconciliationThreads(parallelism);
-            }
-        };
+    private void handleNamespaceChanges(Set<String> namespaces) {
+        registeredControllers.forEach(
+                controller -> {
+                    if (controller.allowsNamespaceChanges()) {
+                        LOG.info("Changing namespaces on {} to {}", controller, namespaces);
+                        controller.changeNamespaces(namespaces);
+                    }
+                });
+    }
+
+    private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
+        int parallelism = configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
+        if (parallelism == -1) {
+            LOG.info("Configuring operator with unbounded reconciliation thread pool.");
+            overrider.withExecutorService(Executors.newCachedThreadPool());
+        } else {
+            LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
+            overrider.withConcurrentReconciliationThreads(parallelism);
+        }
     }
 
     private void registerDeploymentController() {
@@ -120,12 +124,7 @@ public class FlinkOperator {
                         observerFactory,
                         new MetricManager<>(metricGroup),
                         statusHelper);
-
-        FlinkControllerConfig<FlinkDeployment> controllerConfig =
-                new FlinkControllerConfig<>(
-                        controller,
-                        configManager.getOperatorConfiguration().getWatchedNamespaces());
-        operator.register(controller, controllerConfig);
+        registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
     }
 
     private void registerSessionJobController() {
@@ -143,11 +142,20 @@ public class FlinkOperator {
                         new MetricManager<>(metricGroup),
                         statusHelper);
 
-        FlinkControllerConfig<FlinkSessionJob> controllerConfig =
-                new FlinkControllerConfig<>(
-                        controller,
-                        configManager.getOperatorConfiguration().getWatchedNamespaces());
-        operator.register(controller, controllerConfig);
+        registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
+    }
+
+    private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
+        // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259
+        String[] watchedNamespaces =
+                configManager
+                        .getOperatorConfiguration()
+                        .getWatchedNamespaces()
+                        .toArray(String[]::new);
+        String fakeNs = UUID.randomUUID().toString();
+        overrider.settingNamespace(fakeNs);
+        overrider.addingNamespaces(watchedNamespaces);
+        overrider.removingNamespaces(fakeNs);
     }
 
     public void run() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 3a7c1c3..c8e4bc3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
 import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
@@ -38,16 +37,19 @@ import io.fabric8.kubernetes.api.model.ObjectMeta;
 import lombok.Builder;
 import lombok.SneakyThrows;
 import lombok.Value;
+import org.apache.commons.lang3.ObjectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
 import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
@@ -61,15 +63,21 @@ public class FlinkConfigManager {
     private volatile Configuration defaultConfig;
     private volatile FlinkOperatorConfiguration operatorConfiguration;
     private final AtomicLong defaultConfigVersion = new AtomicLong(0);
-
     private final LoadingCache<Key, Configuration> cache;
-    private final Set<String> namespaces = EnvUtils.getWatchedNamespaces();
+    private final Consumer<Set<String>> namespaceListener;
 
-    public FlinkConfigManager() {
-        this(GlobalConfiguration.loadConfiguration());
+    @VisibleForTesting
+    public FlinkConfigManager(Configuration defaultConfig) {
+        this(defaultConfig, ns -> {});
     }
 
-    public FlinkConfigManager(Configuration defaultConfig) {
+    public FlinkConfigManager(Consumer<Set<String>> namespaceListener) {
+        this(GlobalConfiguration.loadConfiguration(), namespaceListener);
+    }
+
+    public FlinkConfigManager(
+            Configuration defaultConfig, Consumer<Set<String>> namespaceListener) {
+        this.namespaceListener = namespaceListener;
         Duration cacheTimeout =
                 defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
         this.cache =
@@ -109,14 +117,22 @@ public class FlinkConfigManager {
 
     @VisibleForTesting
     public void updateDefaultConfig(Configuration newConf) {
-        if (newConf.equals(defaultConfig)) {
+        if (ObjectUtils.allNotNull(this.defaultConfig, newConf)
+                && this.defaultConfig.toMap().equals(newConf.toMap())) {
             LOG.info("Default configuration did not change, nothing to do...");
             return;
         }
 
         LOG.info("Updating default configuration to {}", newConf);
-        this.operatorConfiguration =
-                FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces);
+        var oldNs =
+                Optional.ofNullable(this.operatorConfiguration)
+                        .map(FlinkOperatorConfiguration::getWatchedNamespaces)
+                        .orElse(Set.of());
+        this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(newConf);
+        var newNs = this.operatorConfiguration.getWatchedNamespaces();
+        if (this.operatorConfiguration.getDynamicNamespacesEnabled() && !oldNs.equals(newNs)) {
+            this.namespaceListener.accept(operatorConfiguration.getWatchedNamespaces());
+        }
         this.defaultConfig = newConf.clone();
         // We do not invalidate the cache to avoid deleting currently used temp files,
         // simply bump the version
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 29e1264..c9b1f7e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -24,12 +24,16 @@ import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import lombok.Value;
 
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Set;
 
 /** Configuration class for operator. */
 @Value
 public class FlinkOperatorConfiguration {
 
+    private static final String NAMESPACES_SPLITTER_KEY = "\\s*,\\s*";
+
     Duration reconcileInterval;
     int reconcilerMaxParallelism;
     Duration progressCheckInterval;
@@ -37,14 +41,14 @@ public class FlinkOperatorConfiguration {
     Duration flinkClientTimeout;
     String flinkServiceHostOverride;
     Set<String> watchedNamespaces;
+    Boolean dynamicNamespacesEnabled;
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
     Integer savepointHistoryCountThreshold;
     Duration savepointHistoryAgeThreshold;
 
-    public static FlinkOperatorConfiguration fromConfiguration(
-            Configuration operatorConfig, Set<String> watchedNamespaces) {
+    public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
         Duration reconcileInterval =
                 operatorConfig.get(
                         KubernetesOperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL);
@@ -94,6 +98,19 @@ public class FlinkOperatorConfiguration {
             flinkServiceHostOverride = "localhost";
         }
 
+        var watchedNamespaces =
+                new HashSet<>(
+                        Arrays.asList(
+                                operatorConfig
+                                        .get(
+                                                KubernetesOperatorConfigOptions
+                                                        .OPERATOR_WATCHED_NAMESPACES)
+                                        .split(NAMESPACES_SPLITTER_KEY)));
+
+        boolean dynamicNamespacesEnabled =
+                operatorConfig.get(
+                        KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
+
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -102,6 +119,7 @@ public class FlinkOperatorConfiguration {
                 flinkClientTimeout,
                 flinkServiceHostOverride,
                 watchedNamespaces,
+                dynamicNamespacesEnabled,
                 flinkCancelJobTimeout,
                 flinkShutdownClusterTimeout,
                 artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f4eda21..5ad1655 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
 import io.javaoperatorsdk.operator.api.config.ConfigurationService;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
 
 import java.time.Duration;
 import java.util.Map;
@@ -187,4 +188,19 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Interval at which periodic savepoints will be triggered. "
                                     + "The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.");
+
+    public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
+            ConfigOptions.key("kubernetes.operator.watched.namespaces")
+                    .stringType()
+                    .defaultValue(Constants.WATCH_ALL_NAMESPACES)
+                    .withDescription(
+                            "Comma separated list of namespaces the operator monitors for custom resources. Defaults to "
+                                    + Constants.WATCH_ALL_NAMESPACES);
+
+    public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
+            ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Enables dynamic change of watched/monitored namespaces. Defaults to false");
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java
deleted file mode 100644
index 08fcddd..0000000
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.informer;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.Set;
-
-import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
-
-/** Test for {@link InformerManager}. */
-@EnableKubernetesMockClient(crud = true)
-public class InformerManagerTest {
-
-    private KubernetesMockServer mockServer;
-    private KubernetesClient kubernetesClient;
-
-    @Test
-    public void testNamespacedInformerCreated() {
-        var informerManager = new InformerManager(DEFAULT_NAMESPACES_SET, kubernetesClient);
-        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
-
-        informerManager = new InformerManager(Set.of("ns1", "ns2"), kubernetesClient);
-        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
-        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
-    }
-}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 7e1f21e..3b3f4b2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -32,8 +32,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -42,8 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
 public class ReconciliationUtilsTest {
 
     FlinkOperatorConfiguration operatorConfiguration =
-            FlinkOperatorConfiguration.fromConfiguration(
-                    new Configuration(), Collections.emptySet());
+            FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
     @Test
     public void testRescheduleUpgradeImmediately() {
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 0daa0f9..853092d 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -52,6 +52,14 @@ under the License.
         </dependency>
 
         <!-- Test -->
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>${fabric8.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index a6b9960..890d6f7 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
@@ -37,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
 import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
 
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,11 +61,18 @@ public class FlinkOperatorWebhook {
 
     public static void main(String[] args) throws Exception {
         EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
-        FlinkConfigManager configManager = new FlinkConfigManager();
+        var informerManager = new InformerManager(new DefaultKubernetesClient());
+        var configManager = new FlinkConfigManager(informerManager::setNamespaces);
+        if (!configManager.getOperatorConfiguration().getDynamicNamespacesEnabled()) {
+            informerManager.setNamespaces(
+                    configManager.getOperatorConfiguration().getWatchedNamespaces());
+        }
         Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
+
         AdmissionHandler endpoint =
                 new AdmissionHandler(
-                        new FlinkValidator(validators, configManager), new FlinkMutator());
+                        new FlinkValidator(validators, informerManager), new FlinkMutator());
+
         ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index a1cb2bb..8a4c58f 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -17,17 +17,15 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.crd.CrdConstants;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.kubernetes.operator.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.KubernetesResource;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
 import io.javaoperatorsdk.admissioncontroller.Operation;
@@ -46,13 +44,9 @@ public class FlinkValidator implements Validator<HasMetadata> {
     private final Set<FlinkResourceValidator> validators;
     private final InformerManager informerManager;
 
-    public FlinkValidator(
-            Set<FlinkResourceValidator> validators, FlinkConfigManager configManager) {
+    public FlinkValidator(Set<FlinkResourceValidator> validators, InformerManager informerManager) {
         this.validators = validators;
-        this.informerManager =
-                new InformerManager(
-                        configManager.getOperatorConfiguration().getWatchedNamespaces(),
-                        new DefaultKubernetesClient());
+        this.informerManager = informerManager;
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
similarity index 78%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java
rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
index 453c41e..2915414 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.kubernetes.operator.informer;
+package org.apache.flink.kubernetes.operator.admission.informer;
 
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
 
@@ -38,16 +38,12 @@ import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMES
 public class InformerManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(InformerManager.class);
-    public static final String CLUSTER_ID_INDEX = "clusterId_index";
-    private final Set<String> watchedNamespaces;
+    private final Set<String> watchedNamespaces = ConcurrentHashMap.newKeySet();
     private final KubernetesClient kubernetesClient;
-    private volatile Map<String, SharedIndexInformer<FlinkSessionJob>> sessionJobInformers;
     private volatile Map<String, SharedIndexInformer<FlinkDeployment>> flinkDepInformers;
 
-    public InformerManager(Set<String> watchedNamespaces, KubernetesClient kubernetesClient) {
-        this.watchedNamespaces = watchedNamespaces;
+    public InformerManager(KubernetesClient kubernetesClient) {
         this.kubernetesClient = kubernetesClient;
-        LOG.info("Created informer manager with watchedNamespaces: {}", watchedNamespaces);
     }
 
     public SharedIndexInformer<FlinkDeployment> getFlinkDepInformer(String namespace) {
@@ -67,7 +63,7 @@ public class InformerManager {
             synchronized (this) {
                 if (flinkDepInformers == null) {
                     var runnableInformers =
-                            createRunnableInformer(
+                            createRunnableInformers(
                                     FlinkDeployment.class, watchedNamespaces, kubernetesClient);
                     for (Map.Entry<String, SharedIndexInformer<FlinkDeployment>> runnableInformer :
                             runnableInformers.entrySet()) {
@@ -83,7 +79,7 @@ public class InformerManager {
     }
 
     private static <CR extends HasMetadata>
-            Map<String, SharedIndexInformer<CR>> createRunnableInformer(
+            Map<String, SharedIndexInformer<CR>> createRunnableInformers(
                     Class<CR> resourceClass,
                     Set<String> effectiveNamespaces,
                     KubernetesClient kubernetesClient) {
@@ -104,4 +100,24 @@ public class InformerManager {
             return informers;
         }
     }
+
+    public void setNamespaces(Set<String> watchedNamespaces) {
+        LOG.info("Setting namespaces to {}", watchedNamespaces);
+        this.watchedNamespaces.clear();
+        this.watchedNamespaces.addAll(watchedNamespaces);
+        if (flinkDepInformers != null) {
+            synchronized (this) {
+                if (flinkDepInformers != null) {
+                    flinkDepInformers
+                            .entrySet()
+                            .forEach(
+                                    entry -> {
+                                        LOG.info("Stopping informer in {})", entry.getKey());
+                                        entry.getValue().stop();
+                                    });
+                }
+                flinkDepInformers = null;
+            }
+        }
+    }
 }
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index 327981a..e8f061b 100644
--- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.CrdConstants;
@@ -62,8 +63,8 @@ public class AdmissionHandlerTest {
     private final AdmissionHandler admissionHandler =
             new AdmissionHandler(
                     new FlinkValidator(
-                            ValidatorUtils.discoverValidators(new FlinkConfigManager()),
-                            new FlinkConfigManager()),
+                            ValidatorUtils.discoverValidators(new FlinkConfigManager(ns -> {})),
+                            new InformerManager(null)),
                     new FlinkMutator());
 
     @Test
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
new file mode 100644
index 0000000..d258d6c
--- /dev/null
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.informer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
+
+/** Test for {@link InformerManager}. */
+@EnableKubernetesMockClient(crud = true)
+public class InformerManagerTest {
+
+    private KubernetesMockServer mockServer;
+    private KubernetesClient kubernetesClient;
+
+    @Test
+    public void testNamespacedInformerCreated() {
+        var informerManager = new InformerManager(kubernetesClient);
+        informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+
+        informerManager.setNamespaces(Set.of("ns1", "ns2"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+
+        informerManager.setNamespaces(Set.of("ns1", "ns2", "ns3"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
+    }
+
+    @Test
+    public void testDynamicNamespaces() {
+        InformerManager informerManager = new InformerManager(kubernetesClient);
+        Configuration config =
+                Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+        FlinkConfigManager configManager =
+                new FlinkConfigManager(config, ns -> informerManager.setNamespaces(ns));
+        informerManager.setNamespaces(
+                configManager.getOperatorConfiguration().getWatchedNamespaces());
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+
+        // dynamic namespaces disabled
+        config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2");
+        configManager.updateDefaultConfig(config);
+        Assertions.assertThrows(
+                NullPointerException.class, () -> informerManager.getFlinkDepInformer("ns2"));
+
+        // dynamic namespaces enabled
+        config.set(OPERATOR_DYNAMIC_NAMESPACES_ENABLED, true);
+        config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2,ns3");
+        configManager.updateDefaultConfig(config);
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+        Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
+    }
+}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index 4e9df19..fc4e699 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -68,8 +68,6 @@ spec:
               value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
             - name: JVM_ARGS
               value: {{ .Values.jvmArgs.operator }}
-            - name: FLINK_OPERATOR_WATCH_NAMESPACES
-              value: {{ join "," .Values.watchNamespaces  }}
           securityContext:
             {{- toYaml .Values.operatorSecurityContext | nindent 12 }}
           volumeMounts:
@@ -111,8 +109,6 @@ spec:
               value: /opt/flink/plugins
             - name: OPERATOR_NAMESPACE
               value: {{ .Release.Namespace }}
-            - name: FLINK_OPERATOR_WATCH_NAMESPACES
-              value: {{ join "," .Values.watchNamespaces  }}
           securityContext:
             {{- toYaml .Values.webhookSecurityContext | nindent 12 }}
           volumeMounts:
@@ -160,6 +156,9 @@ data:
 {{- end }}
 {{- if index (.Values.defaultConfiguration) "flink-conf.yaml" }}
   {{- index (.Values.defaultConfiguration) "flink-conf.yaml" | nindent 4 -}}
+{{- end }}
+{{- if .Values.watchNamespaces }}
+    kubernetes.operator.watched.namespaces: {{ join "," .Values.watchNamespaces  }}
 {{- end }}
   log4j-operator.properties: |+
 {{- if .Values.defaultConfiguration.append }}