You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/04 07:42:15 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26436] InformerEventSources should only watch selected namespace
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new d7f9482 [FLINK-26436] InformerEventSources should only watch selected namespace
d7f9482 is described below
commit d7f9482a6a64c4c936a716011f2087bbe148edfd
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Mar 1 15:37:23 2022 +0100
[FLINK-26436] InformerEventSources should only watch selected namespace
---
.../flink/kubernetes/operator/FlinkOperator.java | 1 +
.../controller/FlinkDeploymentController.java | 41 +++++-----
.../kubernetes/operator/utils/OperatorUtils.java | 64 +++++++++++++++
.../controller/FlinkDeploymentControllerTest.java | 94 ++++++++++++++++++----
4 files changed, 163 insertions(+), 37 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 284860d..3112a1f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -79,6 +79,7 @@ public class FlinkOperator {
sessionReconciler);
FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+ controller.setControllerConfig(controllerConfig);
controllerConfig.setConfigurationService(configurationService);
operator.register(controller, controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index bc27ac5..8c45af0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -29,12 +29,11 @@ import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
-import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -45,13 +44,13 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration(generationAwareEventProcessing = false)
@@ -72,6 +71,8 @@ public class FlinkDeploymentController
private final DefaultConfig defaultConfig;
private final FlinkOperatorConfiguration operatorConfiguration;
+ private FlinkControllerConfig controllerConfig;
+
public FlinkDeploymentController(
DefaultConfig defaultConfig,
FlinkOperatorConfiguration operatorConfiguration,
@@ -156,22 +157,16 @@ public class FlinkDeploymentController
}
@Override
- public List<EventSource> prepareEventSources(
- EventSourceContext<FlinkDeployment> eventSourceContext) {
- // reconcile when job manager deployment is ready
- SharedIndexInformer<Deployment> deploymentInformer =
- kubernetesClient
- .apps()
- .deployments()
- .inAnyNamespace()
- .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
- .withLabel(
- Constants.LABEL_COMPONENT_KEY,
- Constants.LABEL_COMPONENT_JOB_MANAGER)
- .runnableInformer(0);
- return List.of(
- new InformerEventSource<>(
- deploymentInformer, Mappers.fromLabel(Constants.LABEL_APP_KEY)));
+ public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
+ Preconditions.checkNotNull(controllerConfig, "Controller config cannot be null");
+ Set<String> effectiveNamespaces = controllerConfig.getEffectiveNamespaces();
+ if (effectiveNamespaces.isEmpty()) {
+ return List.of(OperatorUtils.createJmDepInformerEventSource(kubernetesClient));
+ } else {
+ return effectiveNamespaces.stream()
+ .map(ns -> OperatorUtils.createJmDepInformerEventSource(kubernetesClient, ns))
+ .collect(Collectors.toList());
+ }
}
@Override
@@ -187,4 +182,8 @@ public class FlinkDeploymentController
(e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
return Optional.of(flinkApp);
}
+
+ public void setControllerConfig(FlinkControllerConfig config) {
+ this.controllerConfig = config;
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
new file mode 100644
index 0000000..48e55ee
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+
+/** Operator SDK related utility functions. */
+public class OperatorUtils {
+
+ public static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+ KubernetesClient kubernetesClient, String namespace) {
+ return createJmDepInformerEventSource(
+ kubernetesClient.apps().deployments().inNamespace(namespace), namespace);
+ }
+
+ public static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+ KubernetesClient kubernetesClient) {
+ return createJmDepInformerEventSource(
+ kubernetesClient.apps().deployments().inAnyNamespace(), "all");
+ }
+
+ private static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+ FilterWatchListDeletable<Deployment, DeploymentList> filteredClient, String name) {
+ SharedIndexInformer<Deployment> informer =
+ filteredClient
+ .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+ .withLabel(
+ Constants.LABEL_COMPONENT_KEY,
+ Constants.LABEL_COMPONENT_JOB_MANAGER)
+ .runnableInformer(0);
+
+ return new InformerEventSource<>(informer, Mappers.fromLabel(Constants.LABEL_APP_KEY)) {
+ @Override
+ public String name() {
+ return super.name() + "-" + name;
+ }
+ };
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index e464123..ebd6f42 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -36,12 +36,22 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -56,10 +66,29 @@ public class FlinkDeploymentControllerTest {
private final FlinkOperatorConfiguration operatorConfiguration =
FlinkOperatorConfiguration.fromConfiguration(new Configuration());
+ private TestingFlinkService flinkService;
+ private FlinkDeploymentController testController;
+
+ private KubernetesMockServer mockServer;
+ private NamespacedKubernetesClient kubernetesClient;
+
+ @BeforeEach
+ public void setup() {
+ flinkService = new TestingFlinkService();
+ mockServer = new KubernetesMockServer(new MockWebServer(), new HashMap<>(), true);
+ mockServer.init();
+ kubernetesClient = mockServer.createClient();
+ testController = createTestController(kubernetesClient, flinkService);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ kubernetesClient.close();
+ mockServer.shutdown();
+ }
+
@Test
public void verifyBasicReconcileLoop() {
- TestingFlinkService flinkService = new TestingFlinkService();
- FlinkDeploymentController testController = createTestController(flinkService);
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
UpdateControl<FlinkDeployment> updateControl;
@@ -124,8 +153,6 @@ public class FlinkDeploymentControllerTest {
@Test
public void verifyUpgradeFromSavepoint() {
- TestingFlinkService flinkService = new TestingFlinkService();
- FlinkDeploymentController testController = createTestController(flinkService);
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -177,8 +204,6 @@ public class FlinkDeploymentControllerTest {
@Test
public void verifyStatelessUpgrade() {
- TestingFlinkService flinkService = new TestingFlinkService();
- FlinkDeploymentController testController = createTestController(flinkService);
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -218,20 +243,57 @@ public class FlinkDeploymentControllerTest {
assertEquals(null, jobs.get(0).f0);
}
- private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
+ @Test
+ public void testPrepareEventSource() {
+ // Test watch all
+ testController.setControllerConfig(
+ new FlinkControllerConfig(testController) {
+ @Override
+ public Set<String> getEffectiveNamespaces() {
+ return Set.of();
+ }
+ });
+ List<EventSource> eventSources = testController.prepareEventSources(null);
+ assertEquals(1, eventSources.size());
+ assertTrue(eventSources.get(0).name().endsWith("-all"));
+
+ // Test watch namespaces
+ Set<String> namespaces = Set.of("ns1", "ns2", "ns3");
+ testController.setControllerConfig(
+ new FlinkControllerConfig(testController) {
+ @Override
+ public Set<String> getEffectiveNamespaces() {
+ return namespaces;
+ }
+ });
+ eventSources = testController.prepareEventSources(null);
+ assertEquals(3, eventSources.size());
+ assertEquals(
+ namespaces,
+ eventSources.stream()
+ .map(EventSource::name)
+ .map(s -> s.substring(s.length() - 3))
+ .collect(Collectors.toSet()));
+ }
+
+ private FlinkDeploymentController createTestController(
+ KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
Observer observer = new Observer(flinkService);
JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
SessionReconciler sessionReconciler =
new SessionReconciler(null, flinkService, operatorConfiguration);
- return new FlinkDeploymentController(
- FlinkUtils.loadDefaultConfig(),
- operatorConfiguration,
- null,
- "test",
- new DefaultDeploymentValidator(),
- observer,
- jobReconciler,
- sessionReconciler);
+ FlinkDeploymentController controller =
+ new FlinkDeploymentController(
+ FlinkUtils.loadDefaultConfig(),
+ operatorConfiguration,
+ kubernetesClient,
+ "test",
+ new DefaultDeploymentValidator(),
+ observer,
+ jobReconciler,
+ sessionReconciler);
+ controller.setControllerConfig(new FlinkControllerConfig(controller));
+ return controller;
}
}