You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/06 17:50:35 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26377] Extract the Reconciler interface

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d0e9b3  [FLINK-26377] Extract the Reconciler interface
4d0e9b3 is described below

commit 4d0e9b3c55328ccf0fa5fe0ca72bdf97a30938cd
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Mar 7 01:50:29 2022 +0800

    [FLINK-26377] Extract the Reconciler interface
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 12 ++--
 .../controller/FlinkDeploymentController.java      | 25 +++-----
 .../metrics/KubernetesOperatorMetricGroup.java     |  6 +-
 .../operator/reconciler/BaseReconciler.java        | 17 ++---
 .../kubernetes/operator/reconciler/Reconciler.java | 59 +++++++++++++++++
 .../operator/reconciler/ReconcilerFactory.java     | 74 ++++++++++++++++++++++
 .../controller/FlinkDeploymentControllerTest.java  | 10 +--
 7 files changed, 159 insertions(+), 44 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3112a1f..a579372 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -23,8 +23,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
 import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
 import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
 import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
@@ -60,12 +59,10 @@ public class FlinkOperator {
                 FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
 
         Observer observer = new Observer(flinkService);
-        JobReconciler jobReconciler =
-                new JobReconciler(client, flinkService, operatorConfiguration);
-        SessionReconciler sessionReconciler =
-                new SessionReconciler(client, flinkService, operatorConfiguration);
 
         FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+        ReconcilerFactory factory =
+                new ReconcilerFactory(client, flinkService, operatorConfiguration);
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
@@ -75,8 +72,7 @@ public class FlinkOperator {
                         namespace,
                         validator,
                         observer,
-                        jobReconciler,
-                        sessionReconciler);
+                        factory);
 
         FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
         controller.setControllerConfig(controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 8c45af0..5ce515a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -25,9 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
 import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
@@ -66,8 +64,7 @@ public class FlinkDeploymentController
 
     private final FlinkDeploymentValidator validator;
     private final Observer observer;
-    private final JobReconciler jobReconciler;
-    private final SessionReconciler sessionReconciler;
+    private final ReconcilerFactory reconcilerFactory;
     private final DefaultConfig defaultConfig;
     private final FlinkOperatorConfiguration operatorConfiguration;
 
@@ -80,16 +77,14 @@ public class FlinkDeploymentController
             String operatorNamespace,
             FlinkDeploymentValidator validator,
             Observer observer,
-            JobReconciler jobReconciler,
-            SessionReconciler sessionReconciler) {
+            ReconcilerFactory reconcilerFactory) {
         this.defaultConfig = defaultConfig;
         this.operatorConfiguration = operatorConfiguration;
         this.kubernetesClient = kubernetesClient;
         this.operatorNamespace = operatorNamespace;
         this.validator = validator;
         this.observer = observer;
-        this.jobReconciler = jobReconciler;
-        this.sessionReconciler = sessionReconciler;
+        this.reconcilerFactory = reconcilerFactory;
     }
 
     @Override
@@ -99,8 +94,9 @@ public class FlinkDeploymentController
                 FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
 
         observer.observe(flinkApp, context, effectiveConfig);
-        return getReconciler(flinkApp)
-                .shutdownAndDelete(operatorNamespace, flinkApp, effectiveConfig);
+        return reconcilerFactory
+                .getOrCreate(flinkApp)
+                .cleanup(operatorNamespace, flinkApp, effectiveConfig);
     }
 
     @Override
@@ -126,7 +122,8 @@ public class FlinkDeploymentController
 
         try {
             UpdateControl<FlinkDeployment> updateControl =
-                    getReconciler(flinkApp)
+                    reconcilerFactory
+                            .getOrCreate(flinkApp)
                             .reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
             updateForReconciliationSuccess(flinkApp);
             return updateControl;
@@ -139,10 +136,6 @@ public class FlinkDeploymentController
         }
     }
 
-    private BaseReconciler getReconciler(FlinkDeployment flinkDeployment) {
-        return flinkDeployment.getSpec().getJob() == null ? sessionReconciler : jobReconciler;
-    }
-
     private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
         ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();
         reconciliationStatus.setSuccess(true);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index 83beb22..3b1c088 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -31,9 +31,9 @@ public class KubernetesOperatorMetricGroup
         extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
 
     private static final String GROUP_NAME = "k8soperator";
-    private String namespace;
-    private String name;
-    private String hostname;
+    private final String namespace;
+    private final String name;
+    private final String hostname;
 
     private KubernetesOperatorMetricGroup(
             MetricRegistry registry,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index b4a98d8..1317899 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -26,12 +26,10 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
 /** BaseReconciler with functionality that is common to job and session modes. */
-public abstract class BaseReconciler {
+public abstract class BaseReconciler implements Reconciler {
 
     protected final FlinkOperatorConfiguration operatorConfiguration;
     protected final KubernetesClient kubernetesClient;
@@ -46,14 +44,13 @@ public abstract class BaseReconciler {
         this.operatorConfiguration = operatorConfiguration;
     }
 
-    public abstract UpdateControl<FlinkDeployment> reconcile(
-            String operatorNamespace,
-            FlinkDeployment flinkApp,
-            Context context,
-            Configuration effectiveConfig)
-            throws Exception;
+    @Override
+    public DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+        return shutdownAndDelete(operatorNamespace, flinkApp, effectiveConfig);
+    }
 
-    public DeleteControl shutdownAndDelete(
+    private DeleteControl shutdownAndDelete(
             String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
 
         if (JobManagerDeploymentStatus.READY
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
new file mode 100644
index 0000000..f07a9d8
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The interface of reconciler. */
+public interface Reconciler {
+
+    /**
+     * This is called when receiving the create or update event of the FlinkDeployment resource.
+     *
+     * @param operatorNamespace The namespace of the operator
+     * @param flinkApp the FlinkDeployment resource that has been created or updated
+     * @param context the context with which the operation is executed
+     * @param effectiveConfig the effective config of the flinkApp
+     * @return UpdateControl to manage updates on the custom resource (usually the status) after
+     *     reconciliation.
+     */
+    UpdateControl<FlinkDeployment> reconcile(
+            String operatorNamespace,
+            FlinkDeployment flinkApp,
+            Context context,
+            Configuration effectiveConfig)
+            throws Exception;
+
+    /**
+     * This is called when receiving the delete event of FlinkDeployment resource. This method is
+     * meant to cleanup the associated components like the Flink job components.
+     *
+     * @param operatorNamespace The namespace of the operator
+     * @param flinkApp the FlinkDeployment resource that has been deleted
+     * @param effectiveConfig the effective config of the flinkApp
+     * @return DeleteControl to manage the delete behavior
+     */
+    DeleteControl cleanup(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig);
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
new file mode 100644
index 0000000..94050ce
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class ReconcilerFactory {
+
+    private final KubernetesClient kubernetesClient;
+    private final FlinkService flinkService;
+    private final FlinkOperatorConfiguration operatorConfiguration;
+    private final Map<Mode, Reconciler> reconcilerMap;
+
+    public ReconcilerFactory(
+            KubernetesClient kubernetesClient,
+            FlinkService flinkService,
+            FlinkOperatorConfiguration operatorConfiguration) {
+        this.kubernetesClient = kubernetesClient;
+        this.flinkService = flinkService;
+        this.operatorConfiguration = operatorConfiguration;
+        this.reconcilerMap = new ConcurrentHashMap<>();
+    }
+
+    public Reconciler getOrCreate(FlinkDeployment flinkApp) {
+        return reconcilerMap.computeIfAbsent(
+                getMode(flinkApp),
+                mode -> {
+                    switch (mode) {
+                        case SESSION:
+                            return new SessionReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        case APPLICATION:
+                            return new JobReconciler(
+                                    kubernetesClient, flinkService, operatorConfiguration);
+                        default:
+                            throw new UnsupportedOperationException(
+                                    String.format("Unsupported running mode: %s", mode));
+                    }
+                });
+    }
+
+    private Mode getMode(FlinkDeployment flinkApp) {
+        return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
+    }
+
+    enum Mode {
+        APPLICATION,
+        SESSION
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index ebd6f42..60632ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,9 +29,8 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
 import org.apache.flink.runtime.client.JobStatusMessage;
@@ -279,9 +278,6 @@ public class FlinkDeploymentControllerTest {
     private FlinkDeploymentController createTestController(
             KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
         Observer observer = new Observer(flinkService);
-        JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
-        SessionReconciler sessionReconciler =
-                new SessionReconciler(null, flinkService, operatorConfiguration);
 
         FlinkDeploymentController controller =
                 new FlinkDeploymentController(
@@ -291,8 +287,8 @@ public class FlinkDeploymentControllerTest {
                         "test",
                         new DefaultDeploymentValidator(),
                         observer,
-                        jobReconciler,
-                        sessionReconciler);
+                        new ReconcilerFactory(
+                                kubernetesClient, flinkService, operatorConfiguration));
         controller.setControllerConfig(new FlinkControllerConfig(controller));
         return controller;
     }