You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/19 08:35:45 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27812] Support Dynamic Change of Watched Namespaces
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 1177703 [FLINK-27812] Support Dynamic Change of Watched Namespaces
1177703 is described below
commit 11777032e3c3011c843631471c1b52aecb52dabf
Author: Matyas Orhidi <53...@users.noreply.github.com>
AuthorDate: Sun Jun 19 10:35:41 2022 +0200
[FLINK-27812] Support Dynamic Change of Watched Namespaces
---
.../kubernetes_operator_config_configuration.html | 12 ++++
.../flink/kubernetes/operator/FlinkOperator.java | 80 +++++++++++----------
.../operator/config/FlinkConfigManager.java | 34 ++++++---
.../config/FlinkOperatorConfiguration.java | 22 +++++-
.../config/KubernetesOperatorConfigOptions.java | 16 +++++
.../operator/informer/InformerManagerTest.java | 46 ------------
.../operator/utils/ReconciliationUtilsTest.java | 5 +-
flink-kubernetes-webhook/pom.xml | 8 +++
.../operator/admission/FlinkOperatorWebhook.java | 13 +++-
.../operator/admission/FlinkValidator.java | 12 +---
.../admission}/informer/InformerManager.java | 36 +++++++---
.../operator/admission/AdmissionHandlerTest.java | 5 +-
.../admission/informer/InformerManagerTest.java | 84 ++++++++++++++++++++++
.../templates/flink-operator.yaml | 7 +-
14 files changed, 256 insertions(+), 124 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 71cf1ac..1bfc9de 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -44,6 +44,12 @@
<td>Boolean</td>
<td>Whether to enable on-the-fly config changes through the operator configmap.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.dynamic.namespaces.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Enables dynamic change of watched/monitored namespaces. Defaults to false</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.upgrade.ignore-pending-savepoint</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -146,5 +152,11 @@
<td>Map</td>
<td>Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.watched.namespaces</h5></td>
+ <td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
+ <td>String</td>
+ <td>Comma separated list of namespaces the operator monitors for custom resources. Defaults to JOSDK_ALL_NAMESPACES</td>
+ </tr>
</tbody>
</table>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 4770617..507ab62 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -17,17 +17,13 @@
package org.apache.flink.kubernetes.operator;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
@@ -49,15 +45,18 @@ import org.apache.flink.metrics.MetricGroup;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
+import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
+import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
/** Main Class for Flink native k8s operator. */
public class FlinkOperator {
@@ -69,16 +68,16 @@ public class FlinkOperator {
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
+ private final Set<RegisteredController> registeredControllers = new HashSet<>();
private final MetricGroup metricGroup;
public FlinkOperator(@Nullable Configuration conf) {
this.client = new DefaultKubernetesClient();
- this.configManager = conf != null ? new FlinkConfigManager(conf) : new FlinkConfigManager();
- this.operator =
- new Operator(
- client,
- getConfigurationServiceOverriderConsumer(
- configManager.getOperatorConfiguration()));
+ this.configManager =
+ conf != null
+ ? new FlinkConfigManager(conf) // For testing only
+ : new FlinkConfigManager(this::handleNamespaceChanges);
+ this.operator = new Operator(client, this::overrideOperatorConfigs);
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
this.metricGroup =
@@ -88,20 +87,25 @@ public class FlinkOperator {
FileSystem.initialize(configManager.getDefaultConfig(), pluginManager);
}
- @VisibleForTesting
- protected static Consumer<ConfigurationServiceOverrider>
- getConfigurationServiceOverriderConsumer(
- FlinkOperatorConfiguration operatorConfiguration) {
- return overrider -> {
- int parallelism = operatorConfiguration.getReconcilerMaxParallelism();
- if (parallelism == -1) {
- LOG.info("Configuring operator with unbounded reconciliation thread pool.");
- overrider.withExecutorService(Executors.newCachedThreadPool());
- } else {
- LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
- overrider.withConcurrentReconciliationThreads(parallelism);
- }
- };
+ private void handleNamespaceChanges(Set<String> namespaces) {
+ registeredControllers.forEach(
+ controller -> {
+ if (controller.allowsNamespaceChanges()) {
+ LOG.info("Changing namespaces on {} to {}", controller, namespaces);
+ controller.changeNamespaces(namespaces);
+ }
+ });
+ }
+
+ private void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
+ int parallelism = configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
+ if (parallelism == -1) {
+ LOG.info("Configuring operator with unbounded reconciliation thread pool.");
+ overrider.withExecutorService(Executors.newCachedThreadPool());
+ } else {
+ LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
+ overrider.withConcurrentReconciliationThreads(parallelism);
+ }
}
private void registerDeploymentController() {
@@ -120,12 +124,7 @@ public class FlinkOperator {
observerFactory,
new MetricManager<>(metricGroup),
statusHelper);
-
- FlinkControllerConfig<FlinkDeployment> controllerConfig =
- new FlinkControllerConfig<>(
- controller,
- configManager.getOperatorConfiguration().getWatchedNamespaces());
- operator.register(controller, controllerConfig);
+ registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
private void registerSessionJobController() {
@@ -143,11 +142,20 @@ public class FlinkOperator {
new MetricManager<>(metricGroup),
statusHelper);
- FlinkControllerConfig<FlinkSessionJob> controllerConfig =
- new FlinkControllerConfig<>(
- controller,
- configManager.getOperatorConfiguration().getWatchedNamespaces());
- operator.register(controller, controllerConfig);
+ registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
+ }
+
+ private void overrideControllerConfigs(ControllerConfigurationOverrider<?> overrider) {
+ // TODO: https://github.com/java-operator-sdk/java-operator-sdk/issues/1259
+ String[] watchedNamespaces =
+ configManager
+ .getOperatorConfiguration()
+ .getWatchedNamespaces()
+ .toArray(String[]::new);
+ String fakeNs = UUID.randomUUID().toString();
+ overrider.settingNamespace(fakeNs);
+ overrider.addingNamespaces(watchedNamespaces);
+ overrider.removingNamespaces(fakeNs);
}
public void run() {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index 3a7c1c3..c8e4bc3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -25,7 +25,6 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
@@ -38,16 +37,19 @@ import io.fabric8.kubernetes.api.model.ObjectMeta;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;
+import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED;
@@ -61,15 +63,21 @@ public class FlinkConfigManager {
private volatile Configuration defaultConfig;
private volatile FlinkOperatorConfiguration operatorConfiguration;
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
-
private final LoadingCache<Key, Configuration> cache;
- private final Set<String> namespaces = EnvUtils.getWatchedNamespaces();
+ private final Consumer<Set<String>> namespaceListener;
- public FlinkConfigManager() {
- this(GlobalConfiguration.loadConfiguration());
+ @VisibleForTesting
+ public FlinkConfigManager(Configuration defaultConfig) {
+ this(defaultConfig, ns -> {});
}
- public FlinkConfigManager(Configuration defaultConfig) {
+ public FlinkConfigManager(Consumer<Set<String>> namespaceListener) {
+ this(GlobalConfiguration.loadConfiguration(), namespaceListener);
+ }
+
+ public FlinkConfigManager(
+ Configuration defaultConfig, Consumer<Set<String>> namespaceListener) {
+ this.namespaceListener = namespaceListener;
Duration cacheTimeout =
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
this.cache =
@@ -109,14 +117,22 @@ public class FlinkConfigManager {
@VisibleForTesting
public void updateDefaultConfig(Configuration newConf) {
- if (newConf.equals(defaultConfig)) {
+ if (ObjectUtils.allNotNull(this.defaultConfig, newConf)
+ && this.defaultConfig.toMap().equals(newConf.toMap())) {
LOG.info("Default configuration did not change, nothing to do...");
return;
}
LOG.info("Updating default configuration to {}", newConf);
- this.operatorConfiguration =
- FlinkOperatorConfiguration.fromConfiguration(newConf, namespaces);
+ var oldNs =
+ Optional.ofNullable(this.operatorConfiguration)
+ .map(FlinkOperatorConfiguration::getWatchedNamespaces)
+ .orElse(Set.of());
+ this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(newConf);
+ var newNs = this.operatorConfiguration.getWatchedNamespaces();
+ if (this.operatorConfiguration.getDynamicNamespacesEnabled() && !oldNs.equals(newNs)) {
+ this.namespaceListener.accept(operatorConfiguration.getWatchedNamespaces());
+ }
this.defaultConfig = newConf.clone();
// We do not invalidate the cache to avoid deleting currently used temp files,
// simply bump the version
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 29e1264..c9b1f7e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -24,12 +24,16 @@ import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import lombok.Value;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Set;
/** Configuration class for operator. */
@Value
public class FlinkOperatorConfiguration {
+ private static final String NAMESPACES_SPLITTER_KEY = "\\s*,\\s*";
+
Duration reconcileInterval;
int reconcilerMaxParallelism;
Duration progressCheckInterval;
@@ -37,14 +41,14 @@ public class FlinkOperatorConfiguration {
Duration flinkClientTimeout;
String flinkServiceHostOverride;
Set<String> watchedNamespaces;
+ Boolean dynamicNamespacesEnabled;
Duration flinkCancelJobTimeout;
Duration flinkShutdownClusterTimeout;
String artifactsBaseDir;
Integer savepointHistoryCountThreshold;
Duration savepointHistoryAgeThreshold;
- public static FlinkOperatorConfiguration fromConfiguration(
- Configuration operatorConfig, Set<String> watchedNamespaces) {
+ public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
Duration reconcileInterval =
operatorConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_RECONCILER_RESCHEDULE_INTERVAL);
@@ -94,6 +98,19 @@ public class FlinkOperatorConfiguration {
flinkServiceHostOverride = "localhost";
}
+ var watchedNamespaces =
+ new HashSet<>(
+ Arrays.asList(
+ operatorConfig
+ .get(
+ KubernetesOperatorConfigOptions
+ .OPERATOR_WATCHED_NAMESPACES)
+ .split(NAMESPACES_SPLITTER_KEY)));
+
+ boolean dynamicNamespacesEnabled =
+ operatorConfig.get(
+ KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED);
+
return new FlinkOperatorConfiguration(
reconcileInterval,
reconcilerMaxParallelism,
@@ -102,6 +119,7 @@ public class FlinkOperatorConfiguration {
flinkClientTimeout,
flinkServiceHostOverride,
watchedNamespaces,
+ dynamicNamespacesEnabled,
flinkCancelJobTimeout,
flinkShutdownClusterTimeout,
artifactsBaseDir,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index f4eda21..5ad1655 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
+import io.javaoperatorsdk.operator.api.reconciler.Constants;
import java.time.Duration;
import java.util.Map;
@@ -187,4 +188,19 @@ public class KubernetesOperatorConfigOptions {
.withDescription(
"Interval at which periodic savepoints will be triggered. "
+ "The triggering schedule is not guaranteed, savepoints will be triggered as part of the regular reconcile loop.");
+
+ public static final ConfigOption<String> OPERATOR_WATCHED_NAMESPACES =
+ ConfigOptions.key("kubernetes.operator.watched.namespaces")
+ .stringType()
+ .defaultValue(Constants.WATCH_ALL_NAMESPACES)
+ .withDescription(
+ "Comma separated list of namespaces the operator monitors for custom resources. Defaults to "
+ + Constants.WATCH_ALL_NAMESPACES);
+
+ public static final ConfigOption<Boolean> OPERATOR_DYNAMIC_NAMESPACES_ENABLED =
+ ConfigOptions.key("kubernetes.operator.dynamic.namespaces.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Enables dynamic change of watched/monitored namespaces. Defaults to false");
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java
deleted file mode 100644
index 08fcddd..0000000
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/informer/InformerManagerTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.informer;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.Set;
-
-import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
-
-/** Test for {@link InformerManager}. */
-@EnableKubernetesMockClient(crud = true)
-public class InformerManagerTest {
-
- private KubernetesMockServer mockServer;
- private KubernetesClient kubernetesClient;
-
- @Test
- public void testNamespacedInformerCreated() {
- var informerManager = new InformerManager(DEFAULT_NAMESPACES_SET, kubernetesClient);
- Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
-
- informerManager = new InformerManager(Set.of("ns1", "ns2"), kubernetesClient);
- Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
- Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
- }
-}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 7e1f21e..3b3f4b2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -32,8 +32,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -42,8 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class ReconciliationUtilsTest {
FlinkOperatorConfiguration operatorConfiguration =
- FlinkOperatorConfiguration.fromConfiguration(
- new Configuration(), Collections.emptySet());
+ FlinkOperatorConfiguration.fromConfiguration(new Configuration());
@Test
public void testRescheduleUpgradeImmediately() {
diff --git a/flink-kubernetes-webhook/pom.xml b/flink-kubernetes-webhook/pom.xml
index 0daa0f9..853092d 100644
--- a/flink-kubernetes-webhook/pom.xml
+++ b/flink-kubernetes-webhook/pom.xml
@@ -52,6 +52,14 @@ under the License.
</dependency>
<!-- Test -->
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <version>${fabric8.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index a6b9960..890d6f7 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
@@ -37,6 +38,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslContextBuilder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,11 +61,18 @@ public class FlinkOperatorWebhook {
public static void main(String[] args) throws Exception {
EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Webhook", args);
- FlinkConfigManager configManager = new FlinkConfigManager();
+ var informerManager = new InformerManager(new DefaultKubernetesClient());
+ var configManager = new FlinkConfigManager(informerManager::setNamespaces);
+ if (!configManager.getOperatorConfiguration().getDynamicNamespacesEnabled()) {
+ informerManager.setNamespaces(
+ configManager.getOperatorConfiguration().getWatchedNamespaces());
+ }
Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
+
AdmissionHandler endpoint =
new AdmissionHandler(
- new FlinkValidator(validators, configManager), new FlinkMutator());
+ new FlinkValidator(validators, informerManager), new FlinkMutator());
+
ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
index a1cb2bb..8a4c58f 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkValidator.java
@@ -17,17 +17,15 @@
package org.apache.flink.kubernetes.operator.admission;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-import org.apache.flink.kubernetes.operator.informer.InformerManager;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
-import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
import io.javaoperatorsdk.admissioncontroller.Operation;
@@ -46,13 +44,9 @@ public class FlinkValidator implements Validator<HasMetadata> {
private final Set<FlinkResourceValidator> validators;
private final InformerManager informerManager;
- public FlinkValidator(
- Set<FlinkResourceValidator> validators, FlinkConfigManager configManager) {
+ public FlinkValidator(Set<FlinkResourceValidator> validators, InformerManager informerManager) {
this.validators = validators;
- this.informerManager =
- new InformerManager(
- configManager.getOperatorConfiguration().getWatchedNamespaces(),
- new DefaultKubernetesClient());
+ this.informerManager = informerManager;
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
similarity index 78%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java
rename to flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
index 453c41e..2915414 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManager.java
@@ -15,10 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.informer;
+package org.apache.flink.kubernetes.operator.admission.informer;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -31,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
@@ -38,16 +38,12 @@ import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMES
public class InformerManager {
private static final Logger LOG = LoggerFactory.getLogger(InformerManager.class);
- public static final String CLUSTER_ID_INDEX = "clusterId_index";
- private final Set<String> watchedNamespaces;
+ private final Set<String> watchedNamespaces = ConcurrentHashMap.newKeySet();
private final KubernetesClient kubernetesClient;
- private volatile Map<String, SharedIndexInformer<FlinkSessionJob>> sessionJobInformers;
private volatile Map<String, SharedIndexInformer<FlinkDeployment>> flinkDepInformers;
- public InformerManager(Set<String> watchedNamespaces, KubernetesClient kubernetesClient) {
- this.watchedNamespaces = watchedNamespaces;
+ public InformerManager(KubernetesClient kubernetesClient) {
this.kubernetesClient = kubernetesClient;
- LOG.info("Created informer manager with watchedNamespaces: {}", watchedNamespaces);
}
public SharedIndexInformer<FlinkDeployment> getFlinkDepInformer(String namespace) {
@@ -67,7 +63,7 @@ public class InformerManager {
synchronized (this) {
if (flinkDepInformers == null) {
var runnableInformers =
- createRunnableInformer(
+ createRunnableInformers(
FlinkDeployment.class, watchedNamespaces, kubernetesClient);
for (Map.Entry<String, SharedIndexInformer<FlinkDeployment>> runnableInformer :
runnableInformers.entrySet()) {
@@ -83,7 +79,7 @@ public class InformerManager {
}
private static <CR extends HasMetadata>
- Map<String, SharedIndexInformer<CR>> createRunnableInformer(
+ Map<String, SharedIndexInformer<CR>> createRunnableInformers(
Class<CR> resourceClass,
Set<String> effectiveNamespaces,
KubernetesClient kubernetesClient) {
@@ -104,4 +100,24 @@ public class InformerManager {
return informers;
}
}
+
+ public void setNamespaces(Set<String> watchedNamespaces) {
+ LOG.info("Setting namespaces to {}", watchedNamespaces);
+ this.watchedNamespaces.clear();
+ this.watchedNamespaces.addAll(watchedNamespaces);
+ if (flinkDepInformers != null) {
+ synchronized (this) {
+ if (flinkDepInformers != null) {
+ flinkDepInformers
+ .entrySet()
+ .forEach(
+ entry -> {
+ LOG.info("Stopping informer in {})", entry.getKey());
+ entry.getValue().stop();
+ });
+ }
+ flinkDepInformers = null;
+ }
+ }
+ }
}
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index 327981a..e8f061b 100644
--- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.CrdConstants;
@@ -62,8 +63,8 @@ public class AdmissionHandlerTest {
private final AdmissionHandler admissionHandler =
new AdmissionHandler(
new FlinkValidator(
- ValidatorUtils.discoverValidators(new FlinkConfigManager()),
- new FlinkConfigManager()),
+ ValidatorUtils.discoverValidators(new FlinkConfigManager(ns -> {})),
+ new InformerManager(null)),
new FlinkMutator());
@Test
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
new file mode 100644
index 0000000..d258d6c
--- /dev/null
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/informer/InformerManagerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.informer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_NAMESPACES_ENABLED;
+import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_WATCHED_NAMESPACES;
+
+/** Test for {@link InformerManager}. */
+@EnableKubernetesMockClient(crud = true)
+public class InformerManagerTest {
+
+ private KubernetesMockServer mockServer;
+ private KubernetesClient kubernetesClient;
+
+ @Test
+ public void testNamespacedInformerCreated() {
+ var informerManager = new InformerManager(kubernetesClient);
+ informerManager.setNamespaces(DEFAULT_NAMESPACES_SET);
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+
+ informerManager.setNamespaces(Set.of("ns1", "ns2"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+
+ informerManager.setNamespaces(Set.of("ns1", "ns2", "ns3"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
+ }
+
+ @Test
+ public void testDynamicNamespaces() {
+ InformerManager informerManager = new InformerManager(kubernetesClient);
+ Configuration config =
+ Configuration.fromMap(Map.of(OPERATOR_WATCHED_NAMESPACES.key(), "ns1"));
+ FlinkConfigManager configManager =
+ new FlinkConfigManager(config, ns -> informerManager.setNamespaces(ns));
+ informerManager.setNamespaces(
+ configManager.getOperatorConfiguration().getWatchedNamespaces());
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+
+ // dynamic namespaces disabled
+ config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2");
+ configManager.updateDefaultConfig(config);
+ Assertions.assertThrows(
+ NullPointerException.class, () -> informerManager.getFlinkDepInformer("ns2"));
+
+ // dynamic namespaces enabled
+ config.set(OPERATOR_DYNAMIC_NAMESPACES_ENABLED, true);
+ config.set(OPERATOR_WATCHED_NAMESPACES, "ns1,ns2,ns3");
+ configManager.updateDefaultConfig(config);
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns1"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns2"));
+ Assertions.assertNotNull(informerManager.getFlinkDepInformer("ns3"));
+ }
+}
diff --git a/helm/flink-kubernetes-operator/templates/flink-operator.yaml b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
index 4e9df19..fc4e699 100644
--- a/helm/flink-kubernetes-operator/templates/flink-operator.yaml
+++ b/helm/flink-kubernetes-operator/templates/flink-operator.yaml
@@ -68,8 +68,6 @@ spec:
value: -Dlog4j.configurationFile=/opt/flink/conf/log4j-operator.properties
- name: JVM_ARGS
value: {{ .Values.jvmArgs.operator }}
- - name: FLINK_OPERATOR_WATCH_NAMESPACES
- value: {{ join "," .Values.watchNamespaces }}
securityContext:
{{- toYaml .Values.operatorSecurityContext | nindent 12 }}
volumeMounts:
@@ -111,8 +109,6 @@ spec:
value: /opt/flink/plugins
- name: OPERATOR_NAMESPACE
value: {{ .Release.Namespace }}
- - name: FLINK_OPERATOR_WATCH_NAMESPACES
- value: {{ join "," .Values.watchNamespaces }}
securityContext:
{{- toYaml .Values.webhookSecurityContext | nindent 12 }}
volumeMounts:
@@ -160,6 +156,9 @@ data:
{{- end }}
{{- if index (.Values.defaultConfiguration) "flink-conf.yaml" }}
{{- index (.Values.defaultConfiguration) "flink-conf.yaml" | nindent 4 -}}
+{{- end }}
+{{- if .Values.watchNamespaces }}
+ kubernetes.operator.watched.namespaces: {{ join "," .Values.watchNamespaces }}
{{- end }}
log4j-operator.properties: |+
{{- if .Values.defaultConfiguration.append }}