You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/03/06 17:50:35 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26377] Extract the Reconciler interface
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 4d0e9b3 [FLINK-26377] Extract the Reconciler interface
4d0e9b3 is described below
commit 4d0e9b3c55328ccf0fa5fe0ca72bdf97a30938cd
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Mar 7 01:50:29 2022 +0800
[FLINK-26377] Extract the Reconciler interface
---
.../flink/kubernetes/operator/FlinkOperator.java | 12 ++--
.../controller/FlinkDeploymentController.java | 25 +++-----
.../metrics/KubernetesOperatorMetricGroup.java | 6 +-
.../operator/reconciler/BaseReconciler.java | 17 ++---
.../kubernetes/operator/reconciler/Reconciler.java | 59 +++++++++++++++++
.../operator/reconciler/ReconcilerFactory.java | 74 ++++++++++++++++++++++
.../controller/FlinkDeploymentControllerTest.java | 10 +--
7 files changed, 159 insertions(+), 44 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3112a1f..a579372 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -23,8 +23,7 @@ import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
@@ -60,12 +59,10 @@ public class FlinkOperator {
FlinkOperatorConfiguration.fromConfiguration(defaultConfig.getOperatorConfig());
Observer observer = new Observer(flinkService);
- JobReconciler jobReconciler =
- new JobReconciler(client, flinkService, operatorConfiguration);
- SessionReconciler sessionReconciler =
- new SessionReconciler(client, flinkService, operatorConfiguration);
FlinkDeploymentValidator validator = new DefaultDeploymentValidator();
+ ReconcilerFactory factory =
+ new ReconcilerFactory(client, flinkService, operatorConfiguration);
FlinkDeploymentController controller =
new FlinkDeploymentController(
@@ -75,8 +72,7 @@ public class FlinkOperator {
namespace,
validator,
observer,
- jobReconciler,
- sessionReconciler);
+ factory);
FlinkControllerConfig controllerConfig = new FlinkControllerConfig(controller);
controller.setControllerConfig(controllerConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 8c45af0..5ce515a 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -25,9 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.exception.InvalidDeploymentException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.BaseReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkDeploymentValidator;
@@ -66,8 +64,7 @@ public class FlinkDeploymentController
private final FlinkDeploymentValidator validator;
private final Observer observer;
- private final JobReconciler jobReconciler;
- private final SessionReconciler sessionReconciler;
+ private final ReconcilerFactory reconcilerFactory;
private final DefaultConfig defaultConfig;
private final FlinkOperatorConfiguration operatorConfiguration;
@@ -80,16 +77,14 @@ public class FlinkDeploymentController
String operatorNamespace,
FlinkDeploymentValidator validator,
Observer observer,
- JobReconciler jobReconciler,
- SessionReconciler sessionReconciler) {
+ ReconcilerFactory reconcilerFactory) {
this.defaultConfig = defaultConfig;
this.operatorConfiguration = operatorConfiguration;
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = operatorNamespace;
this.validator = validator;
this.observer = observer;
- this.jobReconciler = jobReconciler;
- this.sessionReconciler = sessionReconciler;
+ this.reconcilerFactory = reconcilerFactory;
}
@Override
@@ -99,8 +94,9 @@ public class FlinkDeploymentController
FlinkUtils.getEffectiveConfig(flinkApp, defaultConfig.getFlinkConfig());
observer.observe(flinkApp, context, effectiveConfig);
- return getReconciler(flinkApp)
- .shutdownAndDelete(operatorNamespace, flinkApp, effectiveConfig);
+ return reconcilerFactory
+ .getOrCreate(flinkApp)
+ .cleanup(operatorNamespace, flinkApp, effectiveConfig);
}
@Override
@@ -126,7 +122,8 @@ public class FlinkDeploymentController
try {
UpdateControl<FlinkDeployment> updateControl =
- getReconciler(flinkApp)
+ reconcilerFactory
+ .getOrCreate(flinkApp)
.reconcile(operatorNamespace, flinkApp, context, effectiveConfig);
updateForReconciliationSuccess(flinkApp);
return updateControl;
@@ -139,10 +136,6 @@ public class FlinkDeploymentController
}
}
- private BaseReconciler getReconciler(FlinkDeployment flinkDeployment) {
- return flinkDeployment.getSpec().getJob() == null ? sessionReconciler : jobReconciler;
- }
-
private void updateForReconciliationSuccess(FlinkDeployment flinkApp) {
ReconciliationStatus reconciliationStatus = flinkApp.getStatus().getReconciliationStatus();
reconciliationStatus.setSuccess(true);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
index 83beb22..3b1c088 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/KubernetesOperatorMetricGroup.java
@@ -31,9 +31,9 @@ public class KubernetesOperatorMetricGroup
extends AbstractMetricGroup<KubernetesOperatorMetricGroup> {
private static final String GROUP_NAME = "k8soperator";
- private String namespace;
- private String name;
- private String hostname;
+ private final String namespace;
+ private final String name;
+ private final String hostname;
private KubernetesOperatorMetricGroup(
MetricRegistry registry,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
index b4a98d8..1317899 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/BaseReconciler.java
@@ -26,12 +26,10 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
/** BaseReconciler with functionality that is common to job and session modes. */
-public abstract class BaseReconciler {
+public abstract class BaseReconciler implements Reconciler {
protected final FlinkOperatorConfiguration operatorConfiguration;
protected final KubernetesClient kubernetesClient;
@@ -46,14 +44,13 @@ public abstract class BaseReconciler {
this.operatorConfiguration = operatorConfiguration;
}
- public abstract UpdateControl<FlinkDeployment> reconcile(
- String operatorNamespace,
- FlinkDeployment flinkApp,
- Context context,
- Configuration effectiveConfig)
- throws Exception;
+ @Override
+ public DeleteControl cleanup(
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ return shutdownAndDelete(operatorNamespace, flinkApp, effectiveConfig);
+ }
- public DeleteControl shutdownAndDelete(
+ private DeleteControl shutdownAndDelete(
String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig) {
if (JobManagerDeploymentStatus.READY
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
new file mode 100644
index 0000000..f07a9d8
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+/** The interface of reconciler. */
+public interface Reconciler {
+
+ /**
+ * This is called when receiving the create or update event of the FlinkDeployment resource.
+ *
+ * @param operatorNamespace The namespace of the operator
+ * @param flinkApp the FlinkDeployment resource that has been created or updated
+ * @param context the context with which the operation is executed
+ * @param effectiveConfig the effective config of the flinkApp
+ * @return UpdateControl to manage updates on the custom resource (usually the status) after
+ * reconciliation.
+ */
+ UpdateControl<FlinkDeployment> reconcile(
+ String operatorNamespace,
+ FlinkDeployment flinkApp,
+ Context context,
+ Configuration effectiveConfig)
+ throws Exception;
+
+ /**
+ * This is called when receiving the delete event of FlinkDeployment resource. This method is
+ * meant to cleanup the associated components like the Flink job components.
+ *
+ * @param operatorNamespace The namespace of the operator
+ * @param flinkApp the FlinkDeployment resource that has been deleted
+ * @param effectiveConfig the effective config of the flinkApp
+ * @return DeleteControl to manage the delete behavior
+ */
+ DeleteControl cleanup(
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig);
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
new file mode 100644
index 0000000..94050ce
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconcilerFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.reconciler;
+
+import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create reconciler based on app mode. */
+public class ReconcilerFactory {
+
+ private final KubernetesClient kubernetesClient;
+ private final FlinkService flinkService;
+ private final FlinkOperatorConfiguration operatorConfiguration;
+ private final Map<Mode, Reconciler> reconcilerMap;
+
+ public ReconcilerFactory(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkOperatorConfiguration operatorConfiguration) {
+ this.kubernetesClient = kubernetesClient;
+ this.flinkService = flinkService;
+ this.operatorConfiguration = operatorConfiguration;
+ this.reconcilerMap = new ConcurrentHashMap<>();
+ }
+
+ public Reconciler getOrCreate(FlinkDeployment flinkApp) {
+ return reconcilerMap.computeIfAbsent(
+ getMode(flinkApp),
+ mode -> {
+ switch (mode) {
+ case SESSION:
+ return new SessionReconciler(
+ kubernetesClient, flinkService, operatorConfiguration);
+ case APPLICATION:
+ return new JobReconciler(
+ kubernetesClient, flinkService, operatorConfiguration);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported running mode: %s", mode));
+ }
+ });
+ }
+
+ private Mode getMode(FlinkDeployment flinkApp) {
+ return flinkApp.getSpec().getJob() != null ? Mode.APPLICATION : Mode.SESSION;
+ }
+
+ enum Mode {
+ APPLICATION,
+ SESSION
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index ebd6f42..60632ec 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -29,9 +29,8 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.JobReconcilerTest;
-import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.validation.DefaultDeploymentValidator;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -279,9 +278,6 @@ public class FlinkDeploymentControllerTest {
private FlinkDeploymentController createTestController(
KubernetesClient kubernetesClient, TestingFlinkService flinkService) {
Observer observer = new Observer(flinkService);
- JobReconciler jobReconciler = new JobReconciler(null, flinkService, operatorConfiguration);
- SessionReconciler sessionReconciler =
- new SessionReconciler(null, flinkService, operatorConfiguration);
FlinkDeploymentController controller =
new FlinkDeploymentController(
@@ -291,8 +287,8 @@ public class FlinkDeploymentControllerTest {
"test",
new DefaultDeploymentValidator(),
observer,
- jobReconciler,
- sessionReconciler);
+ new ReconcilerFactory(
+ kubernetesClient, flinkService, operatorConfiguration));
controller.setControllerConfig(new FlinkControllerConfig(controller));
return controller;
}