You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/20 07:38:37 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27688] Add pluggable FlinkResourceListener interface
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 869c029 [FLINK-27688] Add pluggable FlinkResourceListener interface
869c029 is described below
commit 869c029dc0662299ce34d1570510e65f4fe3794d
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Thu May 19 12:24:21 2022 +0200
[FLINK-27688] Add pluggable FlinkResourceListener interface
---
docs/content/docs/operations/listeners.md | 41 ++++++
docs/content/docs/operations/validator.md | 18 +--
flink-kubernetes-operator/pom.xml | 6 +-
.../flink/kubernetes/operator/FlinkOperator.java | 43 ++++---
.../controller/FlinkDeploymentController.java | 27 ++--
.../controller/FlinkSessionJobController.java | 18 +--
.../operator/listener/FlinkResourceListener.java | 65 ++++++++++
.../operator/listener/ListenerUtils.java | 137 +++++++++++++++++++++
.../operator/observer/JobStatusObserver.java | 8 +-
.../operator/observer/SavepointObserver.java | 17 +--
.../deployment/AbstractDeploymentObserver.java | 15 ++-
.../observer/deployment/ApplicationObserver.java | 15 +--
.../observer/deployment/ObserverFactory.java | 22 ++--
.../observer/deployment/SessionObserver.java | 8 +-
.../observer/sessionjob/SessionJobObserver.java | 18 +--
.../operator/reconciler/ReconciliationUtils.java | 10 +-
.../deployment/AbstractDeploymentReconciler.java | 6 +-
.../deployment/ApplicationReconciler.java | 6 +-
.../reconciler/deployment/ReconcilerFactory.java | 10 +-
.../reconciler/deployment/SessionReconciler.java | 9 +-
.../kubernetes/operator/utils/EventRecorder.java | 97 +++++++++++++++
.../kubernetes/operator/utils/EventUtils.java | 7 +-
.../kubernetes/operator/utils/SavepointUtils.java | 6 +-
.../{StatusHelper.java => StatusRecorder.java} | 74 +++++++++--
...ator-assembly.xml => test-plugins-assembly.xml} | 1 +
.../flink/kubernetes/operator/TestUtils.java | 35 +++++-
...tatusHelper.java => TestingStatusRecorder.java} | 14 +--
.../listener/FlinkResourceListenerTest.java | 114 +++++++++++++++++
.../operator/listener/ListenerUtilsTest.java | 73 +++++++++++
.../operator/listener/TestingListener.java | 71 +++++++++++
.../operator/observer/SavepointObserverTest.java | 27 ++--
.../deployment/ApplicationObserverTest.java | 34 ++---
.../observer/deployment/SessionObserverTest.java | 4 +-
.../sessionjob/SessionJobObserverTest.java | 30 +++--
.../deployment/ApplicationReconcilerTest.java | 37 ++----
.../deployment/SessionReconcilerTest.java | 5 +-
.../kubernetes/operator/utils/EventUtilsTest.java | 6 +-
...atusHelperTest.java => StatusRecorderTest.java} | 6 +-
.../operator/utils/ValidatorUtilsTest.java | 27 ++--
...ernetes.operator.listener.FlinkResourceListener | 16 +++
40 files changed, 945 insertions(+), 238 deletions(-)
diff --git a/docs/content/docs/operations/listeners.md b/docs/content/docs/operations/listeners.md
new file mode 100644
index 0000000..755a725
--- /dev/null
+++ b/docs/content/docs/operations/listeners.md
@@ -0,0 +1,41 @@
+---
+title: "Custom Listeners"
+weight: 5
+type: docs
+aliases:
+- /operations/listeners.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Custom Flink Resource Listeners
+
+The Flink Kubernetes Operator allows users to listen to events and status updates triggered for the Flink Resources managed by the operator.
+This feature enables tighter integration with the user's own data platform.
+
+By implementing the `FlinkResourceListener` interface users can listen to both events and status updates per resource type (`FlinkDeployment` / `FlinkSessionJob`). These methods will be called after the respective events have been triggered by the system.
+Using the context provided on each listener method users can also get access to the related Flink resource and the `KubernetesClient` itself in order to trigger any further events etc on demand.
+
+Similar to [custom validator implementations]({{< ref "docs/operations/validator" >}}) resource listeners are loaded via the Flink [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism.
+
+In order to enable your custom `FlinkResourceListener` you need to:
+
+ 1. Implement the interface
+ 2. Add your listener class to `org.apache.flink.kubernetes.operator.listener.FlinkResourceListener` in `META-INF/services`
+ 3. Package your JAR and add it to the plugins directory of your operator image (`/opt/flink/plugins`)
diff --git a/docs/content/docs/operations/validator.md b/docs/content/docs/operations/validator.md
index 587cc63..316965e 100644
--- a/docs/content/docs/operations/validator.md
+++ b/docs/content/docs/operations/validator.md
@@ -1,5 +1,5 @@
---
-title: "Validator"
+title: "Custom Resource Validators"
weight: 5
type: docs
aliases:
@@ -26,21 +26,21 @@ under the License.
# Custom `FlinkResourceValidator` implementation
-`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`, is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can customize the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`.
+`FlinkResourceValidator`, an interface for validating the resources of `FlinkDeployment` and `FlinkSessionJob`, is a pluggable component based on the [Plugins](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins) mechanism. During development, we can customize the implementation of `FlinkResourceValidator` and make sure to retain the service definition in `META-INF/services`.
The following steps demonstrate how to develop and use a custom validator.
1. Implement `FlinkResourceValidator` interface:
```java
package org.apache.flink.kubernetes.operator.validation;
-
+
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
-
+
import java.util.Optional;
-
+
/** Custom validator implementation of {@link FlinkResourceValidator}. */
public class CustomValidator implements FlinkResourceValidator {
-
+
@Override
public Optional<String> validateDeployment(FlinkDeployment deployment) {
if (deployment.getSpec().getFlinkVersion() == null) {
@@ -48,7 +48,7 @@ The following steps demonstrate how to develop and use a custom validator.
}
return Optional.empty();
}
-
+
@Override
public Optional<String> validateSessionJob(
FlinkSessionJob sessionJob, Optional<FlinkDeployment> session) {
@@ -67,7 +67,7 @@ The following steps demonstrate how to develop and use a custom validator.
3. Use the Maven tool to package the project and generate the custom validator JAR.
-4. Create Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to custom validator plugin directory.
+4. Create Dockerfile to build a custom image from the `apache/flink-kubernetes-operator` official image and copy the generated JAR to custom validator plugin directory.
`/opt/flink/plugins` is the value of `FLINK_PLUGINS_DIR` environment variable in the flink-kubernetes-operator helm chart. The structure of custom validator directory under `/opt/flink/plugins` is as follows:
```text
/opt/flink/plugins
@@ -75,7 +75,7 @@ The following steps demonstrate how to develop and use a custom validator.
│ ├── custom-validator.jar
└── ...
```
-
+
With the custom validator directory location, the Dockerfile is defined as follows:
```shell script
FROM apache/flink-kubernetes-operator
diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml
index e6f60b9..2dd3eb7 100644
--- a/flink-kubernetes-operator/pom.xml
+++ b/flink-kubernetes-operator/pom.xml
@@ -277,16 +277,16 @@ under the License.
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
- <id>create-test-validator-jar</id>
+ <id>create-test-plugin-jar</id>
<phase>process-test-classes</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
- <finalName>test-validator</finalName>
+ <finalName>test-plugins</finalName>
<attach>false</attach>
<descriptors>
- <descriptor>src/test/assembly/test-validator-assembly.xml</descriptor>
+ <descriptor>src/test/assembly/test-plugins-assembly.xml</descriptor>
</descriptors>
</configuration>
</execution>
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 507ab62..d5313d4 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -24,20 +24,20 @@ import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
-import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
-import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
-import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.metrics.MetricGroup;
@@ -53,6 +53,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
@@ -70,6 +71,7 @@ public class FlinkOperator {
private final Set<FlinkResourceValidator> validators;
private final Set<RegisteredController> registeredControllers = new HashSet<>();
private final MetricGroup metricGroup;
+ private final Collection<FlinkResourceListener> listeners;
public FlinkOperator(@Nullable Configuration conf) {
this.client = new DefaultKubernetesClient();
@@ -80,6 +82,7 @@ public class FlinkOperator {
this.operator = new Operator(client, this::overrideOperatorConfigs);
this.flinkService = new FlinkService(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
+ this.listeners = ListenerUtils.discoverListeners(configManager);
this.metricGroup =
OperatorMetricUtils.initOperatorMetrics(configManager.getDefaultConfig());
PluginManager pluginManager =
@@ -109,13 +112,14 @@ public class FlinkOperator {
}
private void registerDeploymentController() {
- StatusHelper<FlinkDeploymentStatus> statusHelper = new StatusHelper<>(client);
- ReconcilerFactory reconcilerFactory =
- new ReconcilerFactory(client, flinkService, configManager);
- ObserverFactory observerFactory =
- new ObserverFactory(client, flinkService, configManager, statusHelper);
-
- FlinkDeploymentController controller =
+ var statusRecorder = StatusRecorder.<FlinkDeploymentStatus>create(client, listeners);
+ var eventRecorder = EventRecorder.create(client, listeners);
+ var reconcilerFactory =
+ new ReconcilerFactory(client, flinkService, configManager, eventRecorder);
+ var observerFactory =
+ new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder);
+
+ var controller =
new FlinkDeploymentController(
configManager,
client,
@@ -123,24 +127,25 @@ public class FlinkOperator {
reconcilerFactory,
observerFactory,
new MetricManager<>(metricGroup),
- statusHelper);
+ statusRecorder,
+ eventRecorder);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
private void registerSessionJobController() {
- Reconciler<FlinkSessionJob> reconciler =
- new FlinkSessionJobReconciler(client, flinkService, configManager);
- StatusHelper<FlinkSessionJobStatus> statusHelper = new StatusHelper<>(client);
- Observer<FlinkSessionJob> observer =
- new SessionJobObserver(flinkService, configManager, statusHelper);
- FlinkSessionJobController controller =
+ var reconciler = new FlinkSessionJobReconciler(client, flinkService, configManager);
+ var eventRecorder = EventRecorder.create(client, listeners);
+ var statusRecorder = StatusRecorder.<FlinkSessionJobStatus>create(client, listeners);
+ var observer =
+ new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+ var controller =
new FlinkSessionJobController(
configManager,
validators,
reconciler,
observer,
new MetricManager<>(metricGroup),
- statusHelper);
+ statusRecorder);
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 439024c..df69871 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -29,9 +29,10 @@ import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -70,7 +71,8 @@ public class FlinkDeploymentController
private final ReconcilerFactory reconcilerFactory;
private final ObserverFactory observerFactory;
private final MetricManager<FlinkDeployment> metricManager;
- private final StatusHelper<FlinkDeploymentStatus> statusHelper;
+ private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+ private final EventRecorder eventRecorder;
private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache =
new ConcurrentHashMap<>();
@@ -81,27 +83,29 @@ public class FlinkDeploymentController
ReconcilerFactory reconcilerFactory,
ObserverFactory observerFactory,
MetricManager<FlinkDeployment> metricManager,
- StatusHelper<FlinkDeploymentStatus> statusHelper) {
+ StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ EventRecorder eventRecorder) {
this.configManager = configManager;
this.kubernetesClient = kubernetesClient;
this.validators = validators;
this.reconcilerFactory = reconcilerFactory;
this.observerFactory = observerFactory;
this.metricManager = metricManager;
- this.statusHelper = statusHelper;
+ this.statusRecorder = statusRecorder;
+ this.eventRecorder = eventRecorder;
}
@Override
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
LOG.info("Deleting FlinkDeployment");
- statusHelper.updateStatusFromCache(flinkApp);
+ statusRecorder.updateStatusFromCache(flinkApp);
try {
observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
} catch (DeploymentFailedException dfe) {
// ignore during cleanup
}
metricManager.onRemove(flinkApp);
- statusHelper.removeCachedStatus(flinkApp);
+ statusRecorder.removeCachedStatus(flinkApp);
return reconcilerFactory.getOrCreate(flinkApp).cleanup(flinkApp, context);
}
@@ -109,13 +113,13 @@ public class FlinkDeploymentController
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context)
throws Exception {
LOG.info("Starting reconciliation");
- statusHelper.updateStatusFromCache(flinkApp);
+ statusRecorder.updateStatusFromCache(flinkApp);
FlinkDeployment previousDeployment = ReconciliationUtils.clone(flinkApp);
try {
observerFactory.getOrCreate(flinkApp).observe(flinkApp, context);
if (!validateDeployment(flinkApp)) {
metricManager.onUpdate(flinkApp);
- statusHelper.patchAndCacheStatus(flinkApp);
+ statusRecorder.patchAndCacheStatus(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(),
flinkApp,
@@ -131,7 +135,7 @@ public class FlinkDeploymentController
LOG.info("End of reconciliation");
metricManager.onUpdate(flinkApp);
- statusHelper.patchAndCacheStatus(flinkApp);
+ statusRecorder.patchAndCacheStatus(flinkApp);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkApp, previousDeployment, true);
}
@@ -141,8 +145,7 @@ public class FlinkDeploymentController
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
ReconciliationUtils.updateForReconciliationError(flinkApp, dfe.getMessage());
- EventUtils.createOrUpdateEvent(
- kubernetesClient,
+ eventRecorder.triggerEvent(
flinkApp,
EventUtils.Type.Warning,
dfe.getReason(),
@@ -162,7 +165,7 @@ public class FlinkDeploymentController
public ErrorStatusUpdateControl<FlinkDeployment> updateErrorStatus(
FlinkDeployment flinkDeployment, Context<FlinkDeployment> context, Exception e) {
return ReconciliationUtils.toErrorStatusUpdateControl(
- flinkDeployment, context.getRetryInfo(), e, metricManager, statusHelper);
+ flinkDeployment, context.getRetryInfo(), e, metricManager, statusRecorder);
}
private boolean validateDeployment(FlinkDeployment deployment) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
index a4b21f4..30ff1ea 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.java
@@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
@@ -62,7 +62,7 @@ public class FlinkSessionJobController
private final Reconciler<FlinkSessionJob> reconciler;
private final Observer<FlinkSessionJob> observer;
private final MetricManager<FlinkSessionJob> metricManager;
- private final StatusHelper<FlinkSessionJobStatus> statusHelper;
+ private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
public FlinkSessionJobController(
FlinkConfigManager configManager,
@@ -70,13 +70,13 @@ public class FlinkSessionJobController
Reconciler<FlinkSessionJob> reconciler,
Observer<FlinkSessionJob> observer,
MetricManager<FlinkSessionJob> metricManager,
- StatusHelper<FlinkSessionJobStatus> statusHelper) {
+ StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
this.configManager = configManager;
this.validators = validators;
this.reconciler = reconciler;
this.observer = observer;
this.metricManager = metricManager;
- this.statusHelper = statusHelper;
+ this.statusRecorder = statusRecorder;
}
@Override
@@ -84,13 +84,13 @@ public class FlinkSessionJobController
FlinkSessionJob flinkSessionJob, Context context) {
LOG.info("Starting reconciliation");
- statusHelper.updateStatusFromCache(flinkSessionJob);
+ statusRecorder.updateStatusFromCache(flinkSessionJob);
FlinkSessionJob previousJob = ReconciliationUtils.clone(flinkSessionJob);
observer.observe(flinkSessionJob, context);
if (!validateSessionJob(flinkSessionJob, context)) {
metricManager.onUpdate(flinkSessionJob);
- statusHelper.patchAndCacheStatus(flinkSessionJob);
+ statusRecorder.patchAndCacheStatus(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkSessionJob, previousJob, false);
}
@@ -101,7 +101,7 @@ public class FlinkSessionJobController
throw new ReconciliationException(e);
}
metricManager.onUpdate(flinkSessionJob);
- statusHelper.patchAndCacheStatus(flinkSessionJob);
+ statusRecorder.patchAndCacheStatus(flinkSessionJob);
return ReconciliationUtils.toUpdateControl(
configManager.getOperatorConfiguration(), flinkSessionJob, previousJob, true);
}
@@ -110,7 +110,7 @@ public class FlinkSessionJobController
public DeleteControl cleanup(FlinkSessionJob sessionJob, Context context) {
LOG.info("Deleting FlinkSessionJob");
metricManager.onRemove(sessionJob);
- statusHelper.removeCachedStatus(sessionJob);
+ statusRecorder.removeCachedStatus(sessionJob);
return reconciler.cleanup(sessionJob, context);
}
@@ -118,7 +118,7 @@ public class FlinkSessionJobController
public ErrorStatusUpdateControl<FlinkSessionJob> updateErrorStatus(
FlinkSessionJob sessionJob, Context<FlinkSessionJob> context, Exception e) {
return ReconciliationUtils.toErrorStatusUpdateControl(
- sessionJob, context.getRetryInfo(), e, metricManager, statusHelper);
+ sessionJob, context.getRetryInfo(), e, metricManager, statusRecorder);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
new file mode 100644
index 0000000..5cd6143
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.core.plugin.Plugin;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+/** Listener interface for Flink resource related events and status changes. */
+public interface FlinkResourceListener extends Plugin {
+
+ void onDeploymentStatusUpdate(StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus> ctx);
+
+ void onDeploymentEvent(ResourceEventContext<FlinkDeployment> ctx);
+
+ void onSessionJobStatusUpdate(StatusUpdateContext<FlinkSessionJob, FlinkSessionJobStatus> ctx);
+
+ void onSessionJobEvent(ResourceEventContext<FlinkSessionJob> ctx);
+
+ /** Base for Resource Event and StatusUpdate contexts. */
+ interface ResourceContext<R extends AbstractFlinkResource<?, ?>> {
+ R getFlinkResource();
+
+ KubernetesClient getKubernetesClient();
+ }
+
+ /** Context for Resource Event listener methods. */
+ interface ResourceEventContext<R extends AbstractFlinkResource<?, ?>>
+ extends ResourceContext<R> {
+ Event getEvent();
+ }
+
+ /** Context for Status listener methods. */
+ interface StatusUpdateContext<R extends AbstractFlinkResource<?, S>, S extends CommonStatus<?>>
+ extends ResourceContext<R> {
+
+ default S getNewStatus() {
+ return getFlinkResource().getStatus();
+ }
+
+ S getPreviousStatus();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
new file mode 100644
index 0000000..f5ce71d
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Flink resource listener utilities. */
+public class ListenerUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
+
+ private static final String PREFIX = "kubernetes.operator.plugins.listeners.";
+ private static final String SUFFIX = ".class";
+ private static final Pattern PTN =
+ Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + Pattern.quote(SUFFIX));
+ private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+ List.of("io.fabric8", "com.fasterxml");
+
+ /**
+ * Load {@link FlinkResourceListener} implementations from the plugin directory. Only listeners
+ * that are explicitly named and configured will be enabled.
+ *
+ * <p>Config format: kubernetes.operator.plugins.listeners.test.class: com.myorg.MyListener
+ * kubernetes.operator.plugins.listeners.test.k1: v1
+ *
+ * @param configManager {@link FlinkConfigManager} to access plugin configurations.
+ * @return Enabled listeners.
+ */
+ public static Collection<FlinkResourceListener> discoverListeners(
+ FlinkConfigManager configManager) {
+ var listeners = new ArrayList<FlinkResourceListener>();
+ var conf = getListenerBaseConf(configManager);
+
+ Map<String, Configuration> listenerConfigs = loadListenerConfigs(conf);
+ PluginUtils.createPluginManagerFromRootFolder(conf)
+ .load(FlinkResourceListener.class)
+ .forEachRemaining(
+ listener -> {
+ String clazz = listener.getClass().getName();
+ LOG.info(
+ "Discovered resource listener from plugin directory[{}]: {}.",
+ System.getenv()
+ .getOrDefault(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+ ConfigConstants.DEFAULT_FLINK_PLUGINS_DIRS),
+ clazz);
+
+ if (listenerConfigs.containsKey(clazz)) {
+ LOG.info("Initializing {}", clazz);
+ listener.configure(listenerConfigs.get(clazz));
+ listeners.add(listener);
+ } else {
+ LOG.warn("No configuration found for {}", clazz);
+ }
+ });
+ return listeners;
+ }
+
+ private static Configuration getListenerBaseConf(FlinkConfigManager configManager) {
+ var conf = new Configuration(configManager.getDefaultConfig());
+ List<String> additionalPatterns =
+ new ArrayList<>(
+ conf.getOptional(
+ CoreOptions
+ .PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
+ .orElse(Collections.emptyList()));
+ additionalPatterns.addAll(EXTRA_PARENT_FIRST_PATTERNS);
+ conf.set(
+ CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+ additionalPatterns);
+ return conf;
+ }
+
+ @VisibleForTesting
+ protected static Map<String, Configuration> loadListenerConfigs(Configuration configuration) {
+ Map<String, Configuration> listenerConfigs = new HashMap<>();
+ for (var enableListeners : findEnabledListeners(configuration)) {
+ DelegatingConfiguration delegatingConfiguration =
+ new DelegatingConfiguration(configuration, PREFIX + enableListeners.f0 + '.');
+ listenerConfigs.put(enableListeners.f1, delegatingConfiguration);
+ }
+ return listenerConfigs;
+ }
+
+ private static Set<Tuple2<String, String>> findEnabledListeners(Configuration configuration) {
+ Set<Tuple2<String, String>> result = new HashSet<>();
+ for (String key : configuration.keySet()) {
+ Matcher matcher = PTN.matcher(key);
+ if (matcher.matches()) {
+ result.add(
+ Tuple2.of(
+ matcher.group(1),
+ configuration.get(
+ ConfigOptions.key(key).stringType().noDefaultValue())));
+ }
+ }
+ return result;
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index afc9582..5c31d88 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -38,9 +39,11 @@ public abstract class JobStatusObserver<CTX> {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
private static final int MAX_ERROR_STRING_LENGTH = 512;
private final FlinkService flinkService;
+ private final EventRecorder eventRecorder;
- public JobStatusObserver(FlinkService flinkService) {
+ public JobStatusObserver(FlinkService flinkService, EventRecorder eventRecorder) {
this.flinkService = flinkService;
+ this.eventRecorder = eventRecorder;
}
/**
@@ -135,8 +138,7 @@ public abstract class JobStatusObserver<CTX> {
LOG.info(message);
setErrorIfPresent(resource, clusterJobStatus, deployedConfig);
- EventUtils.createOrUpdateEvent(
- flinkService.getKubernetesClient(),
+ eventRecorder.triggerEvent(
resource,
EventUtils.Type.Normal,
"Status Changed",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
index a2a624a..202f7b3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java
@@ -30,9 +30,10 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,15 +50,18 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
- private final StatusHelper<STATUS> statusHelper;
+ private final StatusRecorder<STATUS> statusRecorder;
+ private final EventRecorder eventRecorder;
public SavepointObserver(
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusHelper<STATUS> statusHelper) {
+ StatusRecorder<STATUS> statusRecorder,
+ EventRecorder eventRecorder) {
this.flinkService = flinkService;
this.configManager = configManager;
- this.statusHelper = statusHelper;
+ this.statusRecorder = statusRecorder;
+ this.eventRecorder = eventRecorder;
}
public void observeSavepointStatus(
@@ -114,8 +118,7 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
+ err);
ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(
savepointInfo, resource);
- EventUtils.createOrUpdateEvent(
- flinkService.getKubernetesClient(),
+ eventRecorder.triggerEvent(
resource,
EventUtils.Type.Warning,
"SavepointError",
@@ -219,7 +222,7 @@ public class SavepointObserver<STATUS extends CommonStatus<?>> {
LOG.info(
"Updating resource status after observing new last savepoint {}",
currentLastSpPath);
- statusHelper.patchAndCacheStatus(resource);
+ statusRecorder.patchAndCacheStatus(resource);
}
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index 7711c4c..d3217f3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -43,7 +44,6 @@ import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,17 +58,17 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
- protected final KubernetesClient kubernetesClient;
protected final FlinkService flinkService;
protected final FlinkConfigManager configManager;
+ protected final EventRecorder eventRecorder;
public AbstractDeploymentObserver(
- KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
- this.kubernetesClient = kubernetesClient;
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
this.flinkService = flinkService;
this.configManager = configManager;
+ this.eventRecorder = eventRecorder;
}
@Override
@@ -101,7 +101,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
observeClusterInfo(flinkApp, observeConfig);
}
- SavepointUtils.resetTriggerIfJobNotRunning(flinkService.getKubernetesClient(), flinkApp);
+ SavepointUtils.resetTriggerIfJobNotRunning(flinkApp, eventRecorder);
clearErrorsIfDeploymentIsHealthy(flinkApp);
}
@@ -244,8 +244,7 @@ public abstract class AbstractDeploymentObserver implements Observer<FlinkDeploy
String err = "Missing JobManager deployment";
logger.error(err);
ReconciliationUtils.updateForReconciliationError(deployment, err);
- EventUtils.createOrUpdateEvent(
- kubernetesClient,
+ eventRecorder.triggerEvent(
deployment,
EventUtils.Type.Warning,
"Missing",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index dac96a5..27a3150 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -27,10 +27,10 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.ApplicationObserverContext;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.List;
@@ -46,14 +46,15 @@ public class ApplicationObserver extends AbstractDeploymentObserver {
private final JobStatusObserver<ApplicationObserverContext> jobStatusObserver;
public ApplicationObserver(
- KubernetesClient kubernetesClient,
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusHelper<FlinkDeploymentStatus> statusHelper) {
- super(kubernetesClient, flinkService, configManager);
- this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
+ StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ EventRecorder eventRecorder) {
+ super(flinkService, configManager, eventRecorder);
+ this.savepointObserver =
+ new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
this.jobStatusObserver =
- new JobStatusObserver<>(flinkService) {
+ new JobStatusObserver<>(flinkService, eventRecorder) {
@Override
public void onTimeout(ApplicationObserverContext ctx) {
observeJmDeployment(ctx.flinkApp, ctx.context, ctx.deployedConfig);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index e307022..605c276 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -23,9 +23,8 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
-
-import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,21 +32,21 @@ import java.util.concurrent.ConcurrentHashMap;
/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
public class ObserverFactory {
- private final KubernetesClient kubernetesClient;
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
- private final StatusHelper<FlinkDeploymentStatus> statusHelper;
+ private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
+ private final EventRecorder eventRecorder;
private final Map<Mode, Observer<FlinkDeployment>> observerMap;
public ObserverFactory(
- KubernetesClient kubernetesClient,
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusHelper<FlinkDeploymentStatus> statusHelper) {
- this.kubernetesClient = kubernetesClient;
+ StatusRecorder<FlinkDeploymentStatus> statusRecorder,
+ EventRecorder eventRecorder) {
this.flinkService = flinkService;
this.configManager = configManager;
- this.statusHelper = statusHelper;
+ this.statusRecorder = statusRecorder;
+ this.eventRecorder = eventRecorder;
this.observerMap = new ConcurrentHashMap<>();
}
@@ -57,11 +56,10 @@ public class ObserverFactory {
mode -> {
switch (mode) {
case SESSION:
- return new SessionObserver(
- kubernetesClient, flinkService, configManager);
+ return new SessionObserver(flinkService, configManager, eventRecorder);
case APPLICATION:
return new ApplicationObserver(
- kubernetesClient, flinkService, configManager, statusHelper);
+ flinkService, configManager, statusRecorder, eventRecorder);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 4612fb3..fc6f4ed 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.concurrent.TimeoutException;
@@ -31,10 +31,10 @@ import java.util.concurrent.TimeoutException;
public class SessionObserver extends AbstractDeploymentObserver {
public SessionObserver(
- KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
- super(kubernetesClient, flinkService, configManager);
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
+ super(flinkService, configManager, eventRecorder);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 7d16a6a..6169a4d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -28,8 +28,9 @@ import org.apache.flink.kubernetes.operator.observer.SavepointObserver;
import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobHelper;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.util.Preconditions;
@@ -45,20 +46,22 @@ import java.util.stream.Collectors;
public class SessionJobObserver implements Observer<FlinkSessionJob> {
private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
- private final FlinkService flinkService;
private final FlinkConfigManager configManager;
+ private final EventRecorder eventRecorder;
private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
public SessionJobObserver(
FlinkService flinkService,
FlinkConfigManager configManager,
- StatusHelper<FlinkSessionJobStatus> statusHelper) {
- this.flinkService = flinkService;
+ StatusRecorder<FlinkSessionJobStatus> statusRecorder,
+ EventRecorder eventRecorder) {
this.configManager = configManager;
- this.savepointObserver = new SavepointObserver<>(flinkService, configManager, statusHelper);
+ this.eventRecorder = eventRecorder;
+ this.savepointObserver =
+ new SavepointObserver(flinkService, configManager, statusRecorder, eventRecorder);
this.jobStatusObserver =
- new JobStatusObserver<>(flinkService) {
+ new JobStatusObserver<>(flinkService, eventRecorder) {
@Override
protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
@@ -114,7 +117,6 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
if (jobFound) {
savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
}
- SavepointUtils.resetTriggerIfJobNotRunning(
- flinkService.getKubernetesClient(), flinkSessionJob);
+ SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index bd2c5bb..85bdf52 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -40,7 +40,7 @@ import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -372,7 +372,7 @@ public class ReconciliationUtils {
* @param retryInfo Current RetryInformation
* @param e Exception that caused the retry
* @param metricManager Metric manager to be updated
- * @param statusHelper StatusHelper object for patching status
+ * @param statusRecorder statusRecorder object for patching status
* @return This always returns Empty optional currently, due to the status update logic
*/
public static <STATUS extends CommonStatus<?>, R extends AbstractFlinkResource<?, STATUS>>
@@ -381,7 +381,7 @@ public class ReconciliationUtils {
Optional<RetryInfo> retryInfo,
Exception e,
MetricManager<R> metricManager,
- StatusHelper<STATUS> statusHelper) {
+ StatusRecorder<STATUS> statusRecorder) {
retryInfo.ifPresent(
r -> {
@@ -391,12 +391,12 @@ public class ReconciliationUtils {
r.isLastAttempt());
});
- statusHelper.updateStatusFromCache(resource);
+ statusRecorder.updateStatusFromCache(resource);
ReconciliationUtils.updateForReconciliationError(
resource,
(e instanceof ReconciliationException) ? e.getCause().toString() : e.toString());
metricManager.onUpdate(resource);
- statusHelper.patchAndCacheStatus(resource);
+ statusRecorder.patchAndCacheStatus(resource);
// Status was updated already, no need to return anything
return ErrorStatusUpdateControl.noStatusUpdate();
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
index 6c77657..350ff2e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractDeploymentReconciler.java
@@ -27,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -40,16 +41,19 @@ public abstract class AbstractDeploymentReconciler implements Reconciler<FlinkDe
private static final Logger LOG = LoggerFactory.getLogger(AbstractDeploymentReconciler.class);
protected final FlinkConfigManager configManager;
+ protected final EventRecorder eventRecorder;
protected final KubernetesClient kubernetesClient;
protected final FlinkService flinkService;
public AbstractDeploymentReconciler(
KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
this.kubernetesClient = kubernetesClient;
this.flinkService = flinkService;
this.configManager = configManager;
+ this.eventRecorder = eventRecorder;
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 01bfe69..595b7ef 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -62,8 +63,9 @@ public class ApplicationReconciler extends AbstractDeploymentReconciler {
public ApplicationReconciler(
KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
- super(kubernetesClient, flinkService, configManager);
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
+ super(kubernetesClient, flinkService, configManager, eventRecorder);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 7603469..468526e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -34,15 +35,18 @@ public class ReconcilerFactory {
private final KubernetesClient kubernetesClient;
private final FlinkService flinkService;
private final FlinkConfigManager configManager;
+ private final EventRecorder eventRecorder;
private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
public ReconcilerFactory(
KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
this.kubernetesClient = kubernetesClient;
this.flinkService = flinkService;
this.configManager = configManager;
+ this.eventRecorder = eventRecorder;
this.reconcilerMap = new ConcurrentHashMap<>();
}
@@ -53,10 +57,10 @@ public class ReconcilerFactory {
switch (mode) {
case SESSION:
return new SessionReconciler(
- kubernetesClient, flinkService, configManager);
+ kubernetesClient, flinkService, configManager, eventRecorder);
case APPLICATION:
return new ApplicationReconciler(
- kubernetesClient, flinkService, configManager);
+ kubernetesClient, flinkService, configManager, eventRecorder);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 6057178..ccf74f5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
@@ -52,8 +53,9 @@ public class SessionReconciler extends AbstractDeploymentReconciler {
public SessionReconciler(
KubernetesClient kubernetesClient,
FlinkService flinkService,
- FlinkConfigManager configManager) {
- super(kubernetesClient, flinkService, configManager);
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder) {
+ super(kubernetesClient, flinkService, configManager, eventRecorder);
}
@Override
@@ -155,8 +157,7 @@ public class SessionReconciler extends AbstractDeploymentReconciler {
sessionJobs.stream()
.map(job -> job.getMetadata().getName())
.collect(Collectors.toList()));
- if (EventUtils.createOrUpdateEvent(
- kubernetesClient,
+ if (eventRecorder.triggerEvent(
flinkApp,
EventUtils.Type.Warning,
"Cleanup",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
new file mode 100644
index 0000000..ff87e8f
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
+
+import io.fabric8.kubernetes.api.model.Event;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+
+/** Helper class for creating Kubernetes events for Flink resources. */
+public class EventRecorder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EventRecorder.class);
+
+ private final KubernetesClient client;
+ private final BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener;
+
+ public EventRecorder(
+ KubernetesClient client, BiConsumer<AbstractFlinkResource<?, ?>, Event> eventListener) {
+ this.client = client;
+ this.eventListener = eventListener;
+ }
+
+ public boolean triggerEvent(
+ AbstractFlinkResource<?, ?> resource,
+ EventUtils.Type type,
+ String reason,
+ String message,
+ EventUtils.Component component) {
+ return EventUtils.createOrUpdateEvent(
+ client,
+ resource,
+ type,
+ reason,
+ message,
+ component,
+ e -> eventListener.accept(resource, e));
+ }
+
+ public static EventRecorder create(
+ KubernetesClient client, Collection<FlinkResourceListener> listeners) {
+
+ BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumer =
+ (resource, event) ->
+ listeners.forEach(
+ listener -> {
+ var ctx =
+ new FlinkResourceListener.ResourceEventContext() {
+ @Override
+ public Event getEvent() {
+ return event;
+ }
+
+ @Override
+ public AbstractFlinkResource<?, ?>
+ getFlinkResource() {
+ return resource;
+ }
+
+ @Override
+ public KubernetesClient getKubernetesClient() {
+ return client;
+ }
+ };
+
+ if (resource instanceof FlinkDeployment) {
+ listener.onDeploymentEvent(ctx);
+ } else {
+ listener.onSessionJobEvent(ctx);
+ }
+ });
+ return new EventRecorder(client, biConsumer);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index f710378..100b857 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -17,12 +17,14 @@
package org.apache.flink.kubernetes.operator.utils;
+import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.time.Instant;
+import java.util.function.Consumer;
/**
* The util to generate an event for the target resource. It is copied from
@@ -63,7 +65,8 @@ public class EventUtils {
Type type,
String reason,
String message,
- Component component) {
+ Component component,
+ Consumer<Event> eventListener) {
var eventName = generateEventName(target, type, reason, message, component);
var existing =
@@ -86,6 +89,7 @@ public class EventUtils {
.events()
.inNamespace(target.getMetadata().getNamespace())
.createOrReplace(existing);
+ eventListener.accept(existing);
return false;
} else {
var event =
@@ -113,6 +117,7 @@ public class EventUtils {
.endMetadata()
.build();
client.v1().events().inNamespace(target.getMetadata().getNamespace()).create(event);
+ eventListener.accept(event);
return true;
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 6858281..23e5871 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -140,7 +139,7 @@ public class SavepointUtils {
}
public static void resetTriggerIfJobNotRunning(
- KubernetesClient client, AbstractFlinkResource<?, ?> resource) {
+ AbstractFlinkResource<?, ?> resource, EventRecorder eventRecorder) {
var status = resource.getStatus();
var jobStatus = status.getJobStatus();
if (!ReconciliationUtils.isJobRunning(status)
@@ -149,8 +148,7 @@ public class SavepointUtils {
ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
savepointInfo.resetTrigger();
LOG.error("Job is not running, cancelling savepoint operation");
- EventUtils.createOrUpdateEvent(
- client,
+ eventRecorder.triggerEvent(
resource,
EventUtils.Type.Warning,
"SavepointError",
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
similarity index 57%
rename from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java
rename to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index ca96244..ae32f92 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusHelper.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -19,7 +19,12 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,12 +35,14 @@ import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
/** Helper class for status management and updates. */
-public class StatusHelper<STATUS extends CommonStatus<?>> {
+public class StatusRecorder<STATUS extends CommonStatus<?>> {
- private static final Logger LOG = LoggerFactory.getLogger(StatusHelper.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StatusRecorder.class);
protected final ObjectMapper objectMapper = new ObjectMapper();
@@ -43,9 +50,13 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
new ConcurrentHashMap<>();
private final KubernetesClient client;
+ private final BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> statusUpdateListener;
- public StatusHelper(KubernetesClient client) {
+ public StatusRecorder(
+ KubernetesClient client,
+ BiConsumer<AbstractFlinkResource<?, STATUS>, STATUS> statusUpdateListener) {
this.client = client;
+ this.statusUpdateListener = statusUpdateListener;
}
/**
@@ -57,8 +68,7 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
* @param resource Resource for which status update should be performed
*/
@SneakyThrows
- public <T extends CustomResource<?, STATUS>> void patchAndCacheStatus(T resource) {
-
+ public <T extends AbstractFlinkResource<?, STATUS>> void patchAndCacheStatus(T resource) {
Class<T> resourceClass = (Class<T>) resource.getClass();
String namespace = resource.getMetadata().getNamespace();
String name = resource.getMetadata().getName();
@@ -67,14 +77,21 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
// in the meantime
resource.getMetadata().setResourceVersion(null);
- ObjectNode newStatus = objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
- ObjectNode previousStatus = statusCache.put(getKey(resource), newStatus);
+ ObjectNode newStatusNode =
+ objectMapper.convertValue(resource.getStatus(), ObjectNode.class);
+ ObjectNode previousStatusNode = statusCache.put(getKey(resource), newStatusNode);
- if (newStatus.equals(previousStatus)) {
+ if (newStatusNode.equals(previousStatusNode)) {
LOG.debug("No status change.");
return;
}
+ var statusClass =
+ (resource instanceof FlinkDeployment)
+ ? FlinkDeploymentStatus.class
+ : FlinkSessionJobStatus.class;
+ var prevStatus = (STATUS) objectMapper.convertValue(previousStatusNode, statusClass);
+
Exception err = null;
for (int i = 0; i < 3; i++) {
// In any case we retry the status update 3 times to avoid some intermittent
@@ -84,6 +101,7 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
.inNamespace(namespace)
.withName(name)
.patchStatus(resource);
+ statusUpdateListener.accept(resource, prevStatus);
return;
} catch (Exception e) {
LOG.error("Error while patching status, retrying {}/3...", (i + 1), e);
@@ -105,12 +123,16 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
* @param resource Resource for which the status should be updated from the cache
*/
public <T extends CustomResource<?, STATUS>> void updateStatusFromCache(T resource) {
- var cachedStatus = statusCache.get(getKey(resource));
+ var key = getKey(resource);
+ var cachedStatus = statusCache.get(key);
if (cachedStatus != null) {
resource.setStatus(
(STATUS)
objectMapper.convertValue(
cachedStatus, resource.getStatus().getClass()));
+ } else {
+ // Initialize cache with current status copy
+ statusCache.put(key, objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
}
}
@@ -122,4 +144,38 @@ public class StatusHelper<STATUS extends CommonStatus<?>> {
protected static Tuple2<String, String> getKey(HasMetadata resource) {
return Tuple2.of(resource.getMetadata().getNamespace(), resource.getMetadata().getName());
}
+
+ public static <S extends CommonStatus<?>> StatusRecorder<S> create(
+ KubernetesClient kubernetesClient, Collection<FlinkResourceListener> listeners) {
+ BiConsumer<AbstractFlinkResource<?, S>, S> consumer =
+ (resource, previousStatus) -> {
+ var ctx =
+ new FlinkResourceListener.StatusUpdateContext() {
+ @Override
+ public S getPreviousStatus() {
+ return previousStatus;
+ }
+
+ @Override
+ public AbstractFlinkResource<?, S> getFlinkResource() {
+ return resource;
+ }
+
+ @Override
+ public KubernetesClient getKubernetesClient() {
+ return kubernetesClient;
+ }
+ };
+
+ listeners.forEach(
+ listener -> {
+ if (resource instanceof FlinkDeployment) {
+ listener.onDeploymentStatusUpdate(ctx);
+ } else {
+ listener.onSessionJobStatusUpdate(ctx);
+ }
+ });
+ };
+ return new StatusRecorder<>(kubernetesClient, consumer);
+ }
}
diff --git a/flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
similarity index 93%
rename from flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml
rename to flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
index b4abc44..93d7807 100644
--- a/flink-kubernetes-operator/src/test/assembly/test-validator-assembly.xml
+++ b/flink-kubernetes-operator/src/test/assembly/test-plugins-assembly.xml
@@ -29,6 +29,7 @@ under the License.
<!-- the service impl -->
<includes>
<include>org/apache/flink/kubernetes/operator/validation/TestValidator.class</include>
+ <include>org/apache/flink/kubernetes/operator/listener/TestingListener.class</include>
</includes>
</fileSet>
<fileSet>
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 2ad7c1e..c619633 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -41,7 +41,8 @@ import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.metrics.testutils.MetricListener;
@@ -68,8 +69,13 @@ import okhttp3.Headers;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.Assertions;
+import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.net.HttpURLConnection;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
@@ -80,6 +86,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import static org.junit.Assert.assertTrue;
+
/** Testing utilities. */
public class TestUtils {
@@ -92,6 +100,9 @@ public class TestUtils {
public static final String IMAGE_POLICY = "IfNotPresent";
public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
+ private static final String TEST_PLUGINS = "test-plugins";
+ private static final String PlUGINS_JAR = TEST_PLUGINS + "-test-jar.jar";
+
public static FlinkDeployment buildSessionCluster() {
return buildSessionCluster(FlinkVersion.v1_15);
}
@@ -420,6 +431,17 @@ public class TestUtils {
};
}
+ public static String getTestPluginsRootDir(Path temporaryFolder) throws IOException {
+ File testValidatorFolder = new File(temporaryFolder.toFile(), TEST_PLUGINS);
+ assertTrue(testValidatorFolder.mkdirs());
+ File testValidatorJar = new File("target", PlUGINS_JAR);
+ assertTrue(testValidatorJar.exists());
+ Files.copy(
+ testValidatorJar.toPath(), Paths.get(testValidatorFolder.toString(), PlUGINS_JAR));
+
+ return temporaryFolder.toAbsolutePath().toString();
+ }
+
// This code is taken slightly modified from: http://stackoverflow.com/a/7201825/568695
// it changes the environment variables of this JVM. Use only for testing purposes!
@SuppressWarnings("unchecked")
@@ -455,15 +477,18 @@ public class TestUtils {
KubernetesClient kubernetesClient,
TestingFlinkService flinkService) {
- var statusHelper = new StatusHelper<FlinkDeploymentStatus>(kubernetesClient);
+ var statusRecorder =
+ new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (r, s) -> {});
+ var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
return new FlinkDeploymentController(
configManager,
kubernetesClient,
ValidatorUtils.discoverValidators(configManager),
- new ReconcilerFactory(kubernetesClient, flinkService, configManager),
- new ObserverFactory(kubernetesClient, flinkService, configManager, statusHelper),
+ new ReconcilerFactory(kubernetesClient, flinkService, configManager, eventRecorder),
+ new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder),
new MetricManager<>(new MetricListener().getMetricGroup()),
- statusHelper);
+ statusRecorder,
+ eventRecorder);
}
/** Testing ResponseProvider. */
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
similarity index 72%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
index ed29596..0243fd5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusHelper.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingStatusRecorder.java
@@ -18,21 +18,21 @@
package org.apache.flink.kubernetes.operator;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
-import org.apache.flink.kubernetes.operator.utils.StatusHelper;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import io.fabric8.kubernetes.client.CustomResource;
-/** Testing StatusHelper. */
-public class TestingStatusHelper<STATUS extends CommonStatus<?>> extends StatusHelper<STATUS> {
+/** Testing statusRecorder. */
+public class TestingStatusRecorder<STATUS extends CommonStatus<?>> extends StatusRecorder<STATUS> {
- public TestingStatusHelper() {
- super(null);
+ public TestingStatusRecorder() {
+ super(null, (r, s) -> {});
}
@Override
- public <T extends CustomResource<?, STATUS>> void patchAndCacheStatus(T resource) {
+ public <T extends AbstractFlinkResource<?, STATUS>> void patchAndCacheStatus(T resource) {
statusCache.put(
getKey(resource),
objectMapper.convertValue(resource.getStatus(), ObjectNode.class));
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
new file mode 100644
index 0000000..e624b6c
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/FlinkResourceListenerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.EventUtils;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkResourceListener}. */
+@EnableKubernetesMockClient(crud = true)
+public class FlinkResourceListenerTest {
+
+ private KubernetesClient kubernetesClient;
+
+ @BeforeEach
+ public void before() {
+ kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
+ }
+
+ @Test
+ public void testListeners() {
+ var listener1 = new TestingListener();
+ var listener2 = new TestingListener();
+ var listeners = List.<FlinkResourceListener>of(listener1, listener2);
+
+ var statusRecorder =
+ StatusRecorder.<FlinkDeploymentStatus>create(kubernetesClient, listeners);
+ var eventRecorder = EventRecorder.create(kubernetesClient, listeners);
+
+ var deployment = TestUtils.buildApplicationCluster();
+ statusRecorder.updateStatusFromCache(deployment);
+
+ statusRecorder.patchAndCacheStatus(deployment);
+ assertTrue(listener1.updates.isEmpty());
+ assertTrue(listener2.updates.isEmpty());
+ assertTrue(listener1.events.isEmpty());
+ assertTrue(listener2.events.isEmpty());
+
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
+
+ statusRecorder.patchAndCacheStatus(deployment);
+ assertEquals(1, listener1.updates.size());
+ assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
+
+ assertEquals(1, listener2.updates.size());
+ assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+ statusRecorder.patchAndCacheStatus(deployment);
+
+ assertEquals(2, listener1.updates.size());
+ assertEquals(deployment, listener1.updates.get(0).getFlinkResource());
+ assertEquals(2, listener2.updates.size());
+ assertEquals(deployment, listener2.updates.get(0).getFlinkResource());
+
+ var updateContext =
+ (FlinkResourceListener.StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus>)
+ listener1.updates.get(1);
+ assertEquals(
+ JobManagerDeploymentStatus.ERROR,
+ updateContext.getPreviousStatus().getJobManagerDeploymentStatus());
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ updateContext.getNewStatus().getJobManagerDeploymentStatus());
+
+ eventRecorder.triggerEvent(
+ deployment,
+ EventUtils.Type.Warning,
+ "SavepointError",
+ "err",
+ EventUtils.Component.Operator);
+ assertEquals(1, listener1.events.size());
+ eventRecorder.triggerEvent(
+ deployment,
+ EventUtils.Type.Warning,
+ "SavepointError",
+ "err",
+ EventUtils.Component.Operator);
+ assertEquals(2, listener1.events.size());
+
+ for (int i = 0; i < listener1.events.size(); i++) {
+ assertEquals(listener1.events.get(i).getEvent(), listener2.events.get(i).getEvent());
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
new file mode 100644
index 0000000..ba5ddc3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for {@link ListenerUtils}. */
+public class ListenerUtilsTest {
+
+ @TempDir Path temporaryFolder;
+
+ @Test
+ public void testListenerConfiguration() throws IOException {
+ Map<String, String> testConfig = new HashMap<>();
+
+ testConfig.put(
+ "kubernetes.operator.plugins.listeners.test1.class",
+ TestingListener.class.getName());
+ testConfig.put("kubernetes.operator.plugins.listeners.test1.k1", "v1");
+ testConfig.put("kubernetes.operator.plugins.listeners.test1.k2", "v2");
+ testConfig.put("k3", "v3");
+
+ Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+ TestUtils.getTestPluginsRootDir(temporaryFolder));
+ TestUtils.setEnv(systemEnv);
+ var listeners =
+ new ArrayList<>(
+ ListenerUtils.discoverListeners(
+ new FlinkConfigManager(Configuration.fromMap(testConfig))));
+ assertEquals(1, listeners.size());
+
+ var testingListener = (TestingListener) listeners.get(0);
+ assertEquals(
+ Map.of("k1", "v1", "k2", "v2", "class", TestingListener.class.getName()),
+ testingListener.config.toMap());
+ } finally {
+ TestUtils.setEnv(originalEnv);
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java
new file mode 100644
index 0000000..bdb3917
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/TestingListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Listener implementation for testing. */
+public class TestingListener implements FlinkResourceListener {
+
+ public List<StatusUpdateContext<?, ?>> updates = new ArrayList<>();
+ public List<ResourceEventContext<?>> events = new ArrayList<>();
+ public Configuration config;
+
+ public void onStatusUpdate(StatusUpdateContext<?, ?> ctx) {
+ updates.add(ctx);
+ }
+
+ public void onEvent(ResourceEventContext<?> ctx) {
+ events.add(ctx);
+ }
+
+ @Override
+ public void onDeploymentStatusUpdate(
+ StatusUpdateContext<FlinkDeployment, FlinkDeploymentStatus> ctx) {
+ onStatusUpdate(ctx);
+ }
+
+ @Override
+ public void onDeploymentEvent(ResourceEventContext<FlinkDeployment> ctx) {
+ onEvent(ctx);
+ }
+
+ @Override
+ public void onSessionJobStatusUpdate(
+ StatusUpdateContext<FlinkSessionJob, FlinkSessionJobStatus> ctx) {
+ onStatusUpdate(ctx);
+ }
+
+ @Override
+ public void onSessionJobEvent(ResourceEventContext<FlinkSessionJob> ctx) {
+ onEvent(ctx);
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ this.config = config;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index 44aa3bf..946f7b5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -20,15 +20,19 @@ package org.apache.flink.kubernetes.operator.observer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -39,15 +43,25 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link SavepointObserver}. */
+@EnableKubernetesMockClient(crud = true)
public class SavepointObserverTest {
+ private KubernetesClient kubernetesClient;
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
private final TestingFlinkService flinkService = new TestingFlinkService();
+ private SavepointObserver observer;
+ private final EventRecorder eventRecorder = new EventRecorder(null, (r, e) -> {});
+
+ @BeforeEach
+ public void before() {
+ var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ observer =
+ new SavepointObserver(
+ flinkService, configManager, new TestingStatusRecorder<>(), eventRecorder);
+ }
@Test
public void testBasicObserve() {
- SavepointObserver observer =
- new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
SavepointInfo spInfo = new SavepointInfo();
Assertions.assertTrue(spInfo.getSavepointHistory().isEmpty());
@@ -67,8 +81,6 @@ public class SavepointObserverTest {
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE,
Duration.ofMillis(5));
- SavepointObserver observer =
- new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
SavepointInfo spInfo = new SavepointInfo();
Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
@@ -98,9 +110,6 @@ public class SavepointObserverTest {
KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD,
Duration.ofMillis(5));
configManager.updateDefaultConfig(conf);
-
- SavepointObserver observer =
- new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
SavepointInfo spInfo = new SavepointInfo();
Savepoint sp1 = new Savepoint(1, "sp1", SavepointTriggerType.MANUAL);
@@ -135,8 +144,6 @@ public class SavepointObserverTest {
jobStatus.setState("RUNNING");
var savepointInfo = jobStatus.getSavepointInfo();
- var observer =
- new SavepointObserver(flinkService, configManager, new TestingStatusHelper<>());
flinkService.triggerSavepoint(null, SavepointTriggerType.PERIODIC, savepointInfo, conf);
var triggerTs = savepointInfo.getTriggerTimestamp();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 53353db..b9e30f8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -39,6 +40,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -57,13 +59,19 @@ public class ApplicationObserverTest {
private KubernetesClient kubernetesClient;
private final Context readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ private final TestingFlinkService flinkService = new TestingFlinkService();
+ private ApplicationObserver observer;
+
+ @BeforeEach
+ public void before() {
+ var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ observer =
+ new ApplicationObserver(
+ flinkService, configManager, new TestingStatusRecorder<>(), eventRecorder);
+ }
@Test
public void observeApplicationCluster() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
- ApplicationObserver observer =
- new ApplicationObserver(
- kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.setStatus(new FlinkDeploymentStatus());
@@ -167,13 +175,9 @@ public class ApplicationObserverTest {
@Test
public void testEventGeneratedWhenStatusChanged() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
- ApplicationObserver observer =
- new ApplicationObserver(
- kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
deployment.setStatus(new FlinkDeploymentStatus());
@@ -207,13 +211,9 @@ public class ApplicationObserverTest {
@Test
public void testErrorForwardToStatusWhenJobFailed() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
- ApplicationObserver observer =
- new ApplicationObserver(
- kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
flinkService.submitApplicationCluster(deployment.getSpec().getJob(), conf, false);
deployment.setStatus(new FlinkDeploymentStatus());
@@ -235,10 +235,6 @@ public class ApplicationObserverTest {
@Test
public void observeSavepoint() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService(kubernetesClient);
- ApplicationObserver observer =
- new ApplicationObserver(
- kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
Configuration conf =
configManager.getDeployConfig(deployment.getMetadata(), deployment.getSpec());
@@ -436,10 +432,6 @@ public class ApplicationObserverTest {
@Test
public void observeListJobsError() {
- TestingFlinkService flinkService = new TestingFlinkService();
- ApplicationObserver observer =
- new ApplicationObserver(
- kubernetesClient, flinkService, configManager, new TestingStatusHelper<>());
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
bringToReadyStatus(deployment);
observer.observe(deployment, readyContext);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index 3d96fc2..515cdb8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Test;
@@ -39,7 +40,8 @@ public class SessionObserverTest {
public void observeSessionCluster() {
TestingFlinkService flinkService = new TestingFlinkService();
FlinkDeployment deployment = TestUtils.buildSessionCluster();
- SessionObserver observer = new SessionObserver(null, flinkService, configManager);
+ SessionObserver observer =
+ new SessionObserver(flinkService, configManager, new EventRecorder(null, null));
deployment
.getStatus()
.getReconciliationStatus()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index 766c35c..9f4a315 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -23,15 +23,18 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
-import org.apache.flink.kubernetes.operator.TestingStatusHelper;
+import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Map;
@@ -39,16 +42,25 @@ import java.util.Map;
/** Tests for {@link SessionJobObserver}. */
@EnableKubernetesMockClient(crud = true)
public class SessionJobObserverTest {
+
private KubernetesClient kubernetesClient;
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ private final TestingFlinkService flinkService = new TestingFlinkService();
+ private SessionJobObserver observer;
+ private FlinkSessionJobReconciler reconciler;
+
+ @BeforeEach
+ public void before() {
+ var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ TestingStatusRecorder<FlinkSessionJobStatus> statusRecorder = new TestingStatusRecorder<>();
+ observer =
+ new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+ reconciler = new FlinkSessionJobReconciler(kubernetesClient, flinkService, configManager);
+ }
@Test
public void testBasicObserve() throws Exception {
final var sessionJob = TestUtils.buildSessionJob();
- final var flinkService = new TestingFlinkService(kubernetesClient);
- final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
- final var observer =
- new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
// observe the brand new job, nothing to do.
@@ -106,10 +118,6 @@ public class SessionJobObserverTest {
@Test
public void testObserveWithEffectiveConfig() throws Exception {
final var sessionJob = TestUtils.buildSessionJob();
- final var flinkService = new TestingFlinkService(kubernetesClient);
- final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
- final var observer =
- new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
final var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(
Map.of(RestOptions.PORT.key(), "8088"));
@@ -132,10 +140,6 @@ public class SessionJobObserverTest {
@Test
public void testObserveSavepoint() throws Exception {
final var sessionJob = TestUtils.buildSessionJob();
- final var flinkService = new TestingFlinkService(kubernetesClient);
- final var reconciler = new FlinkSessionJobReconciler(null, flinkService, configManager);
- final var observer =
- new SessionJobObserver(flinkService, configManager, new TestingStatusHelper<>());
final var readyContext = TestUtils.createContextWithReadyFlinkDeployment();
// submit job
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 81978fb..ca9d819 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
@@ -46,6 +47,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -67,15 +69,22 @@ public class ApplicationReconcilerTest {
private KubernetesClient kubernetesClient;
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ private TestingFlinkService flinkService = new TestingFlinkService();
+ private ApplicationReconciler reconciler;
+
+ @BeforeEach
+ public void before() {
+ var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
+ reconciler =
+ new ApplicationReconciler(
+ kubernetesClient, flinkService, configManager, eventRecorder);
+ }
@ParameterizedTest
@EnumSource(FlinkVersion.class)
public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
reconciler.reconcile(deployment, context);
@@ -168,11 +177,8 @@ public class ApplicationReconcilerTest {
@Test
public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
final String expectedSavepointPath = "savepoint_0";
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- final ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
@@ -211,10 +217,7 @@ public class ApplicationReconcilerTest {
@Test
public void triggerSavepoint() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
@@ -303,12 +306,9 @@ public class ApplicationReconcilerTest {
@Test
public void testUpgradeModeChangedToLastStateShouldTriggerSavepointWhileHADisabled()
throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
flinkService.setHaDataAvailable(false);
Context context = flinkService.getContext();
- final ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
@@ -352,11 +352,8 @@ public class ApplicationReconcilerTest {
@Test
public void testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEnabled()
throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- final ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
@@ -389,11 +386,8 @@ public class ApplicationReconcilerTest {
@Test
public void triggerRestart() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
@@ -451,10 +445,7 @@ public class ApplicationReconcilerTest {
@Test
public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
- TestingFlinkService flinkService = new TestingFlinkService();
Context context = flinkService.getContext();
- ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
FlinkDeployment deployment = TestUtils.buildApplicationCluster();
reconciler.reconcile(deployment, context);
List<Tuple2<String, JobStatusMessage>> runningJobs = flinkService.listJobs();
@@ -489,10 +480,6 @@ public class ApplicationReconcilerTest {
@Test
public void testRandomJobResultStorePath() throws Exception {
-
- TestingFlinkService flinkService = new TestingFlinkService();
- ApplicationReconciler reconciler =
- new ApplicationReconciler(kubernetesClient, flinkService, configManager);
FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
final String haStoragePath = "file:///flink-data/ha";
flinkApp.getSpec()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 0f7cfbf..e62cc0a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.junit.jupiter.api.Test;
@@ -36,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class SessionReconcilerTest {
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
+ private final EventRecorder eventRecorder = new EventRecorder(null, (r, e) -> {});
@Test
public void testStartSession() throws Exception {
@@ -50,7 +52,8 @@ public class SessionReconcilerTest {
}
};
- SessionReconciler reconciler = new SessionReconciler(null, flinkService, configManager);
+ SessionReconciler reconciler =
+ new SessionReconciler(null, flinkService, configManager, eventRecorder);
FlinkDeployment deployment = TestUtils.buildSessionCluster();
reconciler.reconcile(deployment, context);
assertEquals(1, count.get());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index db2dfec..29ee2c5 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -51,7 +51,8 @@ public class EventUtilsTest {
EventUtils.Type.Warning,
reason,
message,
- EventUtils.Component.Operator));
+ EventUtils.Component.Operator,
+ e -> {}));
var event =
kubernetesClient
.v1()
@@ -70,7 +71,8 @@ public class EventUtilsTest {
EventUtils.Type.Warning,
reason,
message,
- EventUtils.Component.Operator));
+ EventUtils.Component.Operator,
+ e -> {}));
event =
kubernetesClient
.v1()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
similarity index 93%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 8527f93..209c1d2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusHelperTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -28,16 +28,16 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/** Test for {@link StatusHelper}. */
+/** Test for {@link StatusRecorder}. */
@EnableKubernetesMockClient(crud = true)
-public class StatusHelperTest {
+public class StatusRecorderTest {
private KubernetesClient kubernetesClient;
private KubernetesMockServer mockServer;
@Test
public void testPatchOnlyWhenChanged() throws InterruptedException {
- var helper = new StatusHelper<FlinkDeploymentStatus>(kubernetesClient);
+ var helper = new StatusRecorder<FlinkDeploymentStatus>(kubernetesClient, (e, s) -> {});
var deployment = TestUtils.buildApplicationCluster();
kubernetesClient.resource(deployment).createOrReplace();
var lastRequest = mockServer.getLastRequest();
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
index 2417047..cee4071 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
@@ -25,14 +25,11 @@ import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.validation.DefaultValidator;
import org.apache.flink.kubernetes.operator.validation.TestValidator;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
+import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,30 +37,20 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
/** Test class for {@link ValidatorUtils}. */
public class ValidatorUtilsTest {
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- private static final String VALIDATOR_NAME = "test-validator";
- private static final String VALIDATOR_JAR = VALIDATOR_NAME + "-test-jar.jar";
+ @TempDir public Path temporaryFolder;
@Test
public void testDiscoverValidators() throws IOException {
- File validatorRootFolder = temporaryFolder.newFolder();
- File testValidatorFolder = new File(validatorRootFolder, VALIDATOR_NAME);
- assertTrue(testValidatorFolder.mkdirs());
- File testValidatorJar = new File("target", VALIDATOR_JAR);
- assertTrue(testValidatorJar.exists());
- Files.copy(
- testValidatorJar.toPath(),
- Paths.get(testValidatorFolder.toString(), VALIDATOR_JAR));
Map<String, String> originalEnv = System.getenv();
try {
Map<String, String> systemEnv = new HashMap<>(originalEnv);
- systemEnv.put(ConfigConstants.ENV_FLINK_PLUGINS_DIR, validatorRootFolder.getPath());
+ systemEnv.put(
+ ConfigConstants.ENV_FLINK_PLUGINS_DIR,
+ TestUtils.getTestPluginsRootDir(temporaryFolder));
TestUtils.setEnv(systemEnv);
assertEquals(
new HashSet<>(
diff --git a/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener
new file mode 100644
index 0000000..8e89fa4
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/resources/META-INF/services/org.apache.flink.kubernetes.operator.listener.FlinkResourceListener
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.kubernetes.operator.listener.TestingListener
\ No newline at end of file