You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/04 07:42:15 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26436] InformerEventSources should only watch selected namespace

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new d7f9482  [FLINK-26436] InformerEventSources should only watch selected namespace
d7f9482 is described below

commit d7f9482a6a64c4c936a716011f2087bbe148edfd
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Tue Mar 1 15:37:23 2022 +0100

    [FLINK-26436] InformerEventSources should only watch selected namespace
---
 .../flink/kubernetes/operator/FlinkOperator.java   |  1 +
 .../controller/FlinkDeploymentController.java      | 41 +++++-----
 .../kubernetes/operator/utils/OperatorUtils.java   | 64 +++++++++++++++
 .../controller/FlinkDeploymentControllerTest.java  | 94 ++++++++++++++++++----
 4 files changed, 163 insertions(+), 37 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 284860d..3112a1f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -79,6 +79,7 @@ public class FlinkOperator {
                         sessionReconciler);
 
         FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
+        controller.setControllerConfig(controllerConfig);
         controllerConfig.setConfigurationService(configurationService);
 
         operator.register(controller, controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index bc27ac5..8c45af0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -29,12 +29,11 @@ import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
-import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
 
-import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -45,13 +44,13 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
-import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
 @ControllerConfiguration(generationAwareEventProcessing = false)
@@ -72,6 +71,8 @@ public class FlinkDeploymentController
     private final DefaultConfig defaultConfig;
     private final FlinkOperatorConfiguration operatorConfiguration;
 
+    private FlinkControllerConfig controllerConfig;
+
     public FlinkDeploymentController(
             DefaultConfig defaultConfig,
             FlinkOperatorConfiguration operatorConfiguration,
@@ -156,22 +157,16 @@ public class FlinkDeploymentController
     }
 
     @Override
-    public List<EventSource> prepareEventSources(
-            EventSourceContext<FlinkDeployment> eventSourceContext) {
-        // reconcile when job manager deployment is ready
-        SharedIndexInformer<Deployment> deploymentInformer =
-                kubernetesClient
-                        .apps()
-                        .deployments()
-                        .inAnyNamespace()
-                        .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
-                        .withLabel(
-                                Constants.LABEL_COMPONENT_KEY,
-                                Constants.LABEL_COMPONENT_JOB_MANAGER)
-                        .runnableInformer(0);
-        return List.of(
-                new InformerEventSource<>(
-                        deploymentInformer, Mappers.fromLabel(Constants.LABEL_APP_KEY)));
+    public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
+        Preconditions.checkNotNull(controllerConfig, "Controller config cannot be null");
+        Set<String> effectiveNamespaces = controllerConfig.getEffectiveNamespaces();
+        if (effectiveNamespaces.isEmpty()) {
+            return List.of(OperatorUtils.createJmDepInformerEventSource(kubernetesClient));
+        } else {
+            return effectiveNamespaces.stream()
+                    .map(ns -> OperatorUtils.createJmDepInformerEventSource(kubernetesClient, ns))
+                    .collect(Collectors.toList());
+        }
     }
 
     @Override
@@ -187,4 +182,8 @@ public class FlinkDeploymentController
                 (e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
         return Optional.of(flinkApp);
     }
+
+    public void setControllerConfig(FlinkControllerConfig config) {
+        this.controllerConfig = config;
+    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
new file mode 100644
index 0000000..48e55ee
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;
+
+/** Operator SDK related utility functions. */
+public class OperatorUtils {
+
+    public static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+            KubernetesClient kubernetesClient, String namespace) {
+        return createJmDepInformerEventSource(
+                kubernetesClient.apps().deployments().inNamespace(namespace), namespace);
+    }
+
+    public static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+            KubernetesClient kubernetesClient) {
+        return createJmDepInformerEventSource(
+                kubernetesClient.apps().deployments().inAnyNamespace(), "all");
+    }
+
+    private static InformerEventSource<Deployment, HasMetadata> createJmDepInformerEventSource(
+            FilterWatchListDeletable<Deployment, DeploymentList> filteredClient, String name) {
+        SharedIndexInformer<Deployment> informer =
+                filteredClient
+                        .withLabel(Constants.LABEL_TYPE_KEY, Constants.LABEL_TYPE_NATIVE_TYPE)
+                        .withLabel(
+                                Constants.LABEL_COMPONENT_KEY,
+                                Constants.LABEL_COMPONENT_JOB_MANAGER)
+                        .runnableInformer(0);
+
+        return new InformerEventSource<>(informer, Mappers.fromLabel(Constants.LABEL_APP_KEY)) {
+            @Override
+            public String name() {
+                return super.name() + "-" + name;
+            }
+        };
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index e464123..ebd6f42 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -36,12 +36,22 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import okhttp3.mockwebserver.MockWebServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -56,10 +66,29 @@ public class FlinkDeploymentControllerTest {
     private final FlinkOperatorConfiguration operatorConfiguration =
             FlinkOperatorConfiguration.fromConfiguration(new Configuration());
 
+    private TestingFlinkService flinkService;
+    private FlinkDeploymentController testController;
+
+    private KubernetesMockServer mockServer;
+    private NamespacedKubernetesClient kubernetesClient;
+
+    @BeforeEach
+    public void setup() {
+        flinkService = new TestingFlinkService();
+        mockServer = new KubernetesMockServer(new MockWebServer(), new HashMap<>(), true);
+        mockServer.init();
+        kubernetesClient = mockServer.createClient();
+        testController = createTestController(kubernetesClient, flinkService);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        kubernetesClient.close();
+        mockServer.shutdown();
+    }
+
     @Test
     public void verifyBasicReconcileLoop() {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        FlinkDeploymentController testController = createTestController(flinkService);
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
 
         UpdateControl<FlinkDeployment> updateControl;
@@ -124,8 +153,6 @@ public class FlinkDeploymentControllerTest {
 
     @Test
     public void verifyUpgradeFromSavepoint() {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        FlinkDeploymentController testController = createTestController(flinkService);
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -177,8 +204,6 @@ public class FlinkDeploymentControllerTest {
 
     @Test
     public void verifyStatelessUpgrade() {
-        TestingFlinkService flinkService = new TestingFlinkService();
-        FlinkDeploymentController testController = createTestController(flinkService);
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
         appCluster.getSpec().getJob().setInitialSavepointPath("s0");
@@ -218,20 +243,57 @@ public class FlinkDeploymentControllerTest {
         assertEquals(null, jobs.get(0).f0);
     }
 
-    private FlinkDeploymentController createTestController(TestingFlinkService flinkService) {
+    @Test
+    public void testPrepareEventSource() {
+        // Test watch all
+        testController.setControllerConfig(
+                new FlinkControllerConfig(testController) {
+                    @Override
+                    public Set<String> getEffectiveNamespaces() {
+                        return Set.of();
+                    }
+                });
+        List<EventSource> eventSources = testController.prepareEventSources(null);
+        assertEquals(1, eventSources.size());
+        assertTrue(eventSources.get(0).name().endsWith("-all"));
+
+        // Test watch namespaces
+        Set<String> namespaces = Set.of("ns1", "ns2", "ns3");
+        testController.setControllerConfig(
+                new FlinkControllerConfig(testController) {
+                    @Override
+                    public Set<String> getEffectiveNamespaces() {
+                        return namespaces;
+                    }
+                });
+        eventSources = testController.prepareEventSources(null);
+        assertEquals(3, eventSources.size());
+        assertEquals(
+                namespaces,
+                eventSources.stream()
+                        .map(EventSource::name)
+                        .map(s -> s.substring(s.length() - 3))
+                        .collect(Collectors.toSet()));
+    }
+
+    private FlinkDeploymentController createTestController(
+            KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
         Observer observer = new Observer(flinkService);
         JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
         SessionReconciler sessionReconciler =
                 new SessionReconciler(null, flinkService, operatorConfiguration);
 
-        return new FlinkDeploymentController(
-                FlinkUtils.loadDefaultConfig(),
-                operatorConfiguration,
-                null,
-                "test",
-                new DefaultDeploymentValidator(),
-                observer,
-                jobReconciler,
-                sessionReconciler);
+        FlinkDeploymentController controller =
+                new FlinkDeploymentController(
+                        FlinkUtils.loadDefaultConfig(),
+                        operatorConfiguration,
+                        kubernetesClient,
+                        "test",
+                        new DefaultDeploymentValidator(),
+                        observer,
+                        jobReconciler,
+                        sessionReconciler);
+        controller.setControllerConfig(new FlinkControllerConfig(controller));
+        return controller;
     }
 }