You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:33:58 UTC
[flink-kubernetes-operator] 05/23: Extract Observer and Reconciler logic from controller
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 0fe31c540d9181d4edab61650f7c068f94169166
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Wed Feb 2 14:34:01 2022 +0100
Extract Observer and Reconciler logic from controller
---
examples/basic-checkpoint-ha.yaml | 43 +++
examples/basic.yaml | 4 +-
examples/pod-template.yaml | 4 +-
.../controller/FlinkDeploymentController.java | 321 ++-------------------
.../controller/observer/JobStatusObserver.java | 98 +++++++
.../controller/reconciler/JobReconciler.java | 184 ++++++++++++
.../controller/reconciler/SessionReconciler.java | 75 +++++
.../kubernetes/operator/utils/FlinkUtils.java | 55 +++-
8 files changed, 476 insertions(+), 308 deletions(-)
diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml
new file mode 100644
index 0000000..bb3cf41
--- /dev/null
+++ b/examples/basic-checkpoint-ha.yaml
@@ -0,0 +1,43 @@
+apiVersion: flink.io/v1alpha1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: basic-checkpoint-ha-example
+spec:
+ image: flink:1.14.3
+ flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ state.savepoints.dir: file:///flink-data/savepoints
+ high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
+ high-availability.storageDir: file:///flink-data/ha
+ jobManager:
+ replicas: 1
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ taskSlots: 2
+ resource:
+ memory: "2048m"
+ cpu: 1
+ podTemplate:
+ spec:
+ serviceAccount: flink-operator
+ containers:
+ - name: flink-main-container
+ volumeMounts:
+ - mountPath: /flink-data
+ name: flink-volume
+ volumes:
+ - name: flink-volume
+ hostPath:
+ # directory location on host
+ path: /tmp/flink
+ # this field is optional
+ type: Directory
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: savepoint
+ state: running
diff --git a/examples/basic.yaml b/examples/basic.yaml
index 9fdc289..38a5a90 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -9,7 +9,6 @@ spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
kubernetes.jobmanager.service-account: flink-operator
- kubernetes.container-start-command-template: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"
jobManager:
replicas: 1
resource:
@@ -23,5 +22,4 @@ spec:
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
- cancelMode: none
- restoreMode: none
+ upgradeMode: stateless
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 1ce72cd..8563630 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -8,13 +8,13 @@ spec:
flinkVersion: 1.14.3
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
- kubernetes.jobmanager.service-account: flink-operator
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
+ serviceAccount: flink-operator
containers:
# Do not change the main container name
- name: flink-main-container
@@ -55,5 +55,3 @@ spec:
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
- cancelMode: none
- restoreMode: none
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 9a64f04..9cee674 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -1,28 +1,13 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.controller.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.controller.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.controller.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
@@ -36,15 +21,10 @@ import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/** Controller that runs the main reconcile loop for Flink deployments. */
@ControllerConfiguration
@@ -59,300 +39,57 @@ public class FlinkDeploymentController
private final String operatorNamespace;
+ private final JobStatusObserver observer = new JobStatusObserver();
+ private final JobReconciler jobReconciler;
+ private final SessionReconciler sessionReconciler;
+
public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
this.kubernetesClient = kubernetesClient;
this.operatorNamespace = namespace;
+ this.jobReconciler = new JobReconciler(kubernetesClient);
+ this.sessionReconciler = new SessionReconciler(kubernetesClient);
}
@Override
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
- kubernetesClient
- .apps()
- .deployments()
- .inNamespace(flinkApp.getMetadata().getNamespace())
- .withName(flinkApp.getMetadata().getName())
- .cascading(true)
- .delete();
+ FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
return DeleteControl.defaultDelete();
}
@Override
public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Context context) {
LOG.info("Reconciling {}", flinkApp.getMetadata().getName());
- final String namespace = flinkApp.getMetadata().getNamespace();
- final String clusterId = flinkApp.getMetadata().getName();
- final Configuration effectiveConfig =
- FlinkUtils.getEffectiveConfig(namespace, clusterId, flinkApp.getSpec());
- if (flinkApp.getStatus() == null) {
+ Configuration effectiveConfig = FlinkUtils.getEffectiveConfig(flinkApp);
+
+ boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
+ if (success) {
try {
- flinkApp.setStatus(new FlinkDeploymentStatus());
- if (flinkApp.getSpec().getJob() != null
- && flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
- deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
- } else {
- deployFlinkSession(flinkApp, effectiveConfig);
- }
+ success = reconcileFlinkDeployment(flinkApp, effectiveConfig);
} catch (Exception e) {
- LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
- return UpdateControl.<FlinkDeployment>noUpdate()
- .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
- }
- } else {
- boolean validStatus = updateFlinkJobStatus(flinkApp, effectiveConfig);
- if (validStatus) {
- try {
- reconcileDeploymentChanges(flinkApp, effectiveConfig);
- } catch (Exception e) {
- throw new RuntimeException(
- "Could not reconcile deployment change for "
- + flinkApp.getMetadata().getName(),
- e);
- }
+ throw new RuntimeException(
+ "Error while reconciling deployment change for "
+ + flinkApp.getMetadata().getName(),
+ e);
}
}
- flinkApp.getStatus().setSpec(flinkApp.getSpec());
+ if (!success) {
+ return UpdateControl.<FlinkDeployment>noUpdate()
+ .rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
+ }
+ flinkApp.getStatus().setSpec(flinkApp.getSpec());
return UpdateControl.updateStatus(flinkApp)
.rescheduleAfter(JOB_REFRESH_SECONDS, TimeUnit.SECONDS);
}
- private void deployFlinkJob(
- FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
- throws Exception {
- LOG.info("Deploying {}", flinkApp.getMetadata().getName());
- if (savepoint.isPresent()) {
- effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
- } else {
- effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
- }
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ApplicationDeployer deployer =
- new ApplicationClusterDeployer(clusterClientServiceLoader);
-
- final ApplicationConfiguration applicationConfiguration =
- new ApplicationConfiguration(
- flinkApp.getSpec().getJob().getArgs(),
- flinkApp.getSpec().getJob().getEntryClass());
-
- deployer.run(effectiveConfig, applicationConfiguration);
- LOG.info("{} deployed", flinkApp.getMetadata().getName());
- }
-
- private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ClusterClientFactory<String> kubernetesClusterClientFactory =
- clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
- try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
- kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
- kubernetesClusterDescriptor.deploySessionCluster(
- kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
- } catch (Exception e) {
- LOG.error("Failed to deploy {}", flinkApp.getMetadata().getName(), e);
- }
- LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
- }
-
- private boolean updateFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- if (flinkApp.getStatus().getSpec().getJob() == null
- || !flinkApp.getStatus().getSpec().getJob().getState().equals(JobState.RUNNING)) {
- return true;
- }
- LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
- FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
- try (ClusterClient<String> clusterClient =
- FlinkUtils.getRestClusterClient(effectiveConfig)) {
- Collection<JobStatusMessage> clusterJobStatuses =
- clusterClient.listJobs().get(10, TimeUnit.SECONDS);
- flinkAppStatus.setJobStatuses(
- mergeJobStatuses(flinkAppStatus.getJobStatuses(), clusterJobStatuses));
- if (clusterJobStatuses.isEmpty()) {
- LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
- return false;
- } else {
- LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
- return true;
- }
-
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to list jobs for {}", flinkApp, e);
- } else {
- LOG.warn(
- "Failed to list jobs for {}, retrying...",
- flinkApp.getMetadata().getName());
- }
-
- return false;
- }
- }
-
- /**
- * Merge previous job statuses with the new ones from the flink job cluster. We match jobs by
- * their name to preserve savepoint information.
- *
- * @return
- */
- private List<JobStatus> mergeJobStatuses(
- List<JobStatus> oldJobStatuses, Collection<JobStatusMessage> clusterJobStatuses) {
- List<JobStatus> newStatuses =
- oldJobStatuses != null ? new ArrayList<>(oldJobStatuses) : new ArrayList<>();
- Map<String, JobStatus> statusMap =
- newStatuses.stream().collect(Collectors.toMap(JobStatus::getJobName, j -> j));
-
- clusterJobStatuses.forEach(
- js -> {
- if (statusMap.containsKey(js.getJobName())) {
- JobStatus oldStatus = statusMap.get(js.getJobName());
- oldStatus.setState(js.getJobState().name());
- oldStatus.setJobId(js.getJobId().toHexString());
- } else {
- newStatuses.add(FlinkUtils.convert(js));
- }
- });
-
- return newStatuses;
- }
-
- private void reconcileDeploymentChanges(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
-
- if (specChanged) {
- if (flinkApp.getStatus().getSpec().getJob() != null) {
- if (flinkApp.getSpec().getJob() == null) {
- throw new RuntimeException("Cannot switch from job to session cluster");
- }
- JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
- JobState desiredJobState = flinkApp.getSpec().getJob().getState();
-
- UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
- if (currentJobState == JobState.RUNNING) {
- if (desiredJobState == JobState.RUNNING) {
- upgradeFlinkJob(flinkApp, effectiveConfig);
- }
- if (desiredJobState.equals(JobState.SUSPENDED)) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- cancelJob(flinkApp, effectiveConfig);
- } else {
- suspendJob(flinkApp, effectiveConfig);
- }
- }
- }
- if (currentJobState == JobState.SUSPENDED) {
- if (desiredJobState == JobState.RUNNING) {
- if (upgradeMode == UpgradeMode.STATELESS) {
- deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
- } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
- restoreFromLastSavepoint(flinkApp, effectiveConfig);
- } else {
- throw new UnsupportedOperationException(
- "Only savepoint and stateless strategies are supported at the moment.");
- }
- }
- }
- } else {
- if (flinkApp.getSpec().getJob() != null) {
- throw new RuntimeException("Cannot switch from session to job cluster");
- }
- upgradeSessionCluster(flinkApp, effectiveConfig);
- }
- }
- }
-
- private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Upgrading running job");
- Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
- deployFlinkJob(flinkApp, effectiveConfig, savepoint);
- }
-
- private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- JobStatus jobStatus = flinkApp.getStatus().getJobStatuses().get(0);
-
- String savepointLocation = jobStatus.getSavepointLocation();
- if (savepointLocation == null) {
- throw new RuntimeException("Cannot perform stateful restore without a valid savepoint");
- }
- deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
- }
-
- private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Suspending {}", flinkApp.getMetadata().getName());
- JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
- return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
- }
-
- private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
- UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
- JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
- return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
- }
-
- private Optional<String> cancelJob(
- FlinkDeployment flinkApp,
- JobID jobID,
- UpgradeMode upgradeMode,
- Configuration effectiveConfig)
- throws Exception {
- Optional<String> ret = Optional.empty();
- try (ClusterClient<String> clusterClient =
- FlinkUtils.getRestClusterClient(effectiveConfig)) {
- switch (upgradeMode) {
- case STATELESS:
- clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
- break;
- case SAVEPOINT:
- String savepoint =
- clusterClient
- .stopWithSavepoint(jobID, false, null)
- .get(1, TimeUnit.MINUTES);
- ret = Optional.of(savepoint);
- break;
- default:
- throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
- }
- }
- waitForClusterShutdown(effectiveConfig);
- JobStatus jobStatus =
- flinkApp.getStatus().getJobStatuses().stream()
- .filter(j -> j.getJobId().equals(jobID.toHexString()))
- .findFirst()
- .get();
- jobStatus.setState("suspended");
- jobStatus.setSavepointLocation(ret.orElse(null));
- return ret;
- }
-
- /** We need this due to the buggy flink kube cluster client behaviour for now. */
- private void waitForClusterShutdown(Configuration effectiveConfig) throws InterruptedException {
- Fabric8FlinkKubeClient flinkKubeClient =
- new Fabric8FlinkKubeClient(
- effectiveConfig,
- (NamespacedKubernetesClient) kubernetesClient,
- Executors.newSingleThreadExecutor());
- for (int i = 0; i < 60; i++) {
- if (!flinkKubeClient
- .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
- .isPresent()) {
- break;
- }
- LOG.info("Waiting for cluster shutdown... ({})", i);
- Thread.sleep(1000);
- }
- }
-
- private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig) {
- throw new UnsupportedOperationException("Not implemented yet");
+ private boolean reconcileFlinkDeployment(
+ FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+ return flinkApp.getSpec().getJob() == null
+ ? sessionReconciler.reconcile(flinkApp, effectiveConfig)
+ : jobReconciler.reconcile(flinkApp, effectiveConfig);
}
@Override
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
new file mode 100644
index 0000000..869fec0
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
@@ -0,0 +1,98 @@
+package org.apache.flink.kubernetes.operator.controller.observer;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Observes the actual state of the running jobs on the Flink cluster. */
+public class JobStatusObserver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+
+ public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
+ if (flinkApp.getStatus() == null) {
+ // This is the first run, nothing to observe
+ return true;
+ }
+
+ JobSpec jobSpec = flinkApp.getStatus().getSpec().getJob();
+
+ if (jobSpec == null) {
+ // This is a session cluster, nothing to observe
+ return true;
+ }
+
+ if (!jobSpec.getState().equals(JobState.RUNNING)) {
+ // The job is not running, nothing to observe
+ return true;
+ }
+ LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
+ FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
+ try (ClusterClient<String> clusterClient =
+ FlinkUtils.getRestClusterClient(effectiveConfig)) {
+ Collection<JobStatusMessage> clusterJobStatuses =
+ clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ flinkAppStatus.setJobStatuses(
+ mergeJobStatuses(flinkAppStatus.getJobStatuses(), clusterJobStatuses));
+ if (clusterJobStatuses.isEmpty()) {
+ LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
+ return false;
+ } else {
+ LOG.info("Job statuses updated for {}", flinkApp.getMetadata().getName());
+ return true;
+ }
+
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to list jobs for {}", flinkApp, e);
+ } else {
+ LOG.warn(
+ "Failed to list jobs for {}, retrying...",
+ flinkApp.getMetadata().getName());
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Merge previous job statuses with the new ones from the flink job cluster. We match jobs by
+ * their name to preserve savepoint information.
+ */
+ private List<JobStatus> mergeJobStatuses(
+ List<JobStatus> oldJobStatuses, Collection<JobStatusMessage> clusterJobStatuses) {
+ List<JobStatus> newStatuses =
+ oldJobStatuses != null ? new ArrayList<>(oldJobStatuses) : new ArrayList<>();
+ Map<String, JobStatus> statusMap =
+ newStatuses.stream().collect(Collectors.toMap(JobStatus::getJobName, j -> j));
+
+ clusterJobStatuses.forEach(
+ js -> {
+ if (statusMap.containsKey(js.getJobName())) {
+ JobStatus oldStatus = statusMap.get(js.getJobName());
+ oldStatus.setState(js.getJobState().name());
+ oldStatus.setJobId(js.getJobId().toHexString());
+ } else {
+ newStatuses.add(FlinkUtils.convert(js));
+ }
+ });
+
+ return newStatuses;
+ }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
new file mode 100644
index 0000000..6c9ea2b
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
@@ -0,0 +1,184 @@
+package org.apache.flink.kubernetes.operator.controller.reconciler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Reconciler responsible for handling the job lifecycle according to the desired and current
+ * states.
+ */
+public class JobReconciler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
+
+ private final KubernetesClient kubernetesClient;
+
+ public JobReconciler(KubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+
+ if (flinkApp.getStatus() == null) {
+ flinkApp.setStatus(new FlinkDeploymentStatus());
+ if (!flinkApp.getSpec().getJob().getState().equals(JobState.RUNNING)) {
+ throw new RuntimeException("Job must start in running state");
+ }
+ try {
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ return true;
+ } catch (Exception e) {
+ LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
+ return false;
+ }
+ }
+
+ boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+ if (specChanged) {
+ if (flinkApp.getStatus().getSpec().getJob() == null) {
+ throw new RuntimeException("Cannot switch from session to job cluster");
+ }
+ JobState currentJobState = flinkApp.getStatus().getSpec().getJob().getState();
+ JobState desiredJobState = flinkApp.getSpec().getJob().getState();
+
+ UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+ if (currentJobState == JobState.RUNNING) {
+ if (desiredJobState == JobState.RUNNING) {
+ upgradeFlinkJob(flinkApp, effectiveConfig);
+ }
+ if (desiredJobState.equals(JobState.SUSPENDED)) {
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ cancelJob(flinkApp, effectiveConfig);
+ } else {
+ suspendJob(flinkApp, effectiveConfig);
+ }
+ }
+ }
+ if (currentJobState == JobState.SUSPENDED) {
+ if (desiredJobState == JobState.RUNNING) {
+ if (upgradeMode == UpgradeMode.STATELESS) {
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.empty());
+ } else if (upgradeMode == UpgradeMode.SAVEPOINT) {
+ restoreFromLastSavepoint(flinkApp, effectiveConfig);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only savepoint and stateless strategies are supported at the moment.");
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private void deployFlinkJob(
+ FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
+ throws Exception {
+ LOG.info("Deploying {}", flinkApp.getMetadata().getName());
+ if (savepoint.isPresent()) {
+ effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
+ } else {
+ effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
+ }
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ApplicationDeployer deployer =
+ new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+ final ApplicationConfiguration applicationConfiguration =
+ new ApplicationConfiguration(
+ flinkApp.getSpec().getJob().getArgs(),
+ flinkApp.getSpec().getJob().getEntryClass());
+
+ deployer.run(effectiveConfig, applicationConfiguration);
+ LOG.info("{} deployed", flinkApp.getMetadata().getName());
+ }
+
+ private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ LOG.info("Upgrading running job");
+ Optional<String> savepoint = cancelJob(flinkApp, effectiveConfig);
+ deployFlinkJob(flinkApp, effectiveConfig, savepoint);
+ }
+
+ private void restoreFromLastSavepoint(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ JobStatus jobStatus = flinkApp.getStatus().getJobStatuses().get(0);
+
+ String savepointLocation = jobStatus.getSavepointLocation();
+ if (savepointLocation == null) {
+ throw new RuntimeException("Cannot perform stateful restore without a valid savepoint");
+ }
+ deployFlinkJob(flinkApp, effectiveConfig, Optional.of(savepointLocation));
+ }
+
+ private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ LOG.info("Suspending {}", flinkApp.getMetadata().getName());
+ JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
+ return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
+ }
+
+ private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
+ UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
+ JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatuses().get(0).getJobId());
+ return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
+ }
+
+ private Optional<String> cancelJob(
+ FlinkDeployment flinkApp,
+ JobID jobID,
+ UpgradeMode upgradeMode,
+ Configuration effectiveConfig)
+ throws Exception {
+ Optional<String> ret = Optional.empty();
+ try (ClusterClient<String> clusterClient =
+ FlinkUtils.getRestClusterClient(effectiveConfig)) {
+ switch (upgradeMode) {
+ case STATELESS:
+ clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+ break;
+ case SAVEPOINT:
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(jobID, false, null)
+ .get(1, TimeUnit.MINUTES);
+ ret = Optional.of(savepoint);
+ break;
+ default:
+ throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+ }
+ }
+ FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+ JobStatus jobStatus =
+ flinkApp.getStatus().getJobStatuses().stream()
+ .filter(j -> j.getJobId().equals(jobID.toHexString()))
+ .findFirst()
+ .get();
+ jobStatus.setState("suspended");
+ jobStatus.setSavepointLocation(ret.orElse(null));
+ return ret;
+ }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
new file mode 100644
index 0000000..4984f08
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
@@ -0,0 +1,75 @@
+package org.apache.flink.kubernetes.operator.controller.reconciler;
+
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reconciler responsible for handling the session cluster lifecycle according to the desired and
+ * current states.
+ */
+public class SessionReconciler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
+
+ private final KubernetesClient kubernetesClient;
+
+ public SessionReconciler(KubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ if (flinkApp.getStatus() == null) {
+ flinkApp.setStatus(new FlinkDeploymentStatus());
+ try {
+ deployFlinkSession(flinkApp, effectiveConfig);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Error while deploying " + flinkApp.getMetadata().getName());
+ return false;
+ }
+ }
+
+ boolean specChanged = !flinkApp.getSpec().equals(flinkApp.getStatus().getSpec());
+
+ if (specChanged) {
+ if (flinkApp.getStatus().getSpec().getJob() != null) {
+ throw new RuntimeException("Cannot switch from job to session cluster");
+ }
+ upgradeSessionCluster(flinkApp, effectiveConfig);
+ }
+ return true;
+ }
+
+ private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ClusterClientFactory<String> kubernetesClusterClientFactory =
+ clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
+ try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
+ kubernetesClusterDescriptor.deploySessionCluster(
+ kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
+ }
+ LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
+ }
+
+ private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
+ FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
+ FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+ deployFlinkSession(flinkApp, effectiveConfig);
+ }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index f31c4d4..007b5af 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -13,6 +13,8 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.runtime.client.JobStatusMessage;
@@ -20,6 +22,8 @@ import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClie
import org.apache.flink.util.StringUtils;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.internal.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,23 +33,24 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.Collections;
+import java.util.concurrent.Executors;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
private static final Logger LOG = LoggerFactory.getLogger(FlinkUtils.class);
- public static Configuration getEffectiveConfig(
- String namespace, String clusterId, FlinkDeploymentSpec spec) {
- try {
- final String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
- final Configuration effectiveConfig;
+ public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
+ String namespace = flinkApp.getMetadata().getNamespace();
+ String clusterId = flinkApp.getMetadata().getName();
+ FlinkDeploymentSpec spec = flinkApp.getSpec();
- if (flinkConfDir != null) {
- effectiveConfig = GlobalConfiguration.loadConfiguration(flinkConfDir);
- } else {
- effectiveConfig = new Configuration();
- }
+ try {
+ String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
+ Configuration effectiveConfig =
+ flinkConfDir != null
+ ? GlobalConfiguration.loadConfiguration(flinkConfDir)
+ : new Configuration();
effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
@@ -164,4 +169,34 @@ public class FlinkUtils {
jobStatus.setState(message.getJobState().name());
return jobStatus;
}
+
+ public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(flinkApp.getMetadata().getNamespace())
+ .withName(flinkApp.getMetadata().getName())
+ .cascading(true)
+ .delete();
+ }
+
+ /** We need this due to the buggy flink kube cluster client behaviour for now. */
+ public static void waitForClusterShutdown(
+ KubernetesClient kubernetesClient, Configuration effectiveConfig)
+ throws InterruptedException {
+ Fabric8FlinkKubeClient flinkKubeClient =
+ new Fabric8FlinkKubeClient(
+ effectiveConfig,
+ (NamespacedKubernetesClient) kubernetesClient,
+ Executors.newSingleThreadExecutor());
+ for (int i = 0; i < 60; i++) {
+ if (!flinkKubeClient
+ .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
+ .isPresent()) {
+ break;
+ }
+ LOG.info("Waiting for cluster shutdown... ({})", i);
+ Thread.sleep(1000);
+ }
+ }
}