You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:06 UTC
[flink-kubernetes-operator] 13/23: Introduce FlinkService for cluster interactions
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 5ea10e385ed3a9b16e60cef3d012f4c8ee5a008c
Author: Gyula Fora <g_...@apple.com>
AuthorDate: Mon Feb 7 14:43:04 2022 +0100
Introduce FlinkService for cluster interactions
* Extract FlinkService for cluster interactions
* Update some dependencies
---
pom.xml | 8 +-
.../flink/kubernetes/operator/FlinkOperator.java | 20 ++-
.../controller/FlinkDeploymentController.java | 22 +--
.../observer/JobStatusObserver.java | 17 ++-
.../{controller => }/reconciler/JobReconciler.java | 64 ++-------
.../reconciler/SessionReconciler.java | 34 ++---
.../kubernetes/operator/service/FlinkService.java | 155 +++++++++++++++++++++
.../kubernetes/operator/utils/FlinkUtils.java | 41 ------
8 files changed, 222 insertions(+), 139 deletions(-)
diff --git a/pom.xml b/pom.xml
index ee19b02..4838678 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,14 +32,14 @@ under the License.
<maven-failsafe-plugin.version>3.0.0-M4</maven-failsafe-plugin.version>
<operator.sdk.version>2.0.1</operator.sdk.version>
- <fabric8.version>5.11.2</fabric8.version>
- <lombok.version>1.18.10</lombok.version>
+ <fabric8.version>5.12.1</fabric8.version>
+ <lombok.version>1.18.22</lombok.version>
<scala.version>2.12</scala.version>
<flink.version>1.14.3</flink.version>
<slf4j.version>1.7.15</slf4j.version>
- <log4j.version>2.17.0</log4j.version>
+ <log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<awaitility.version>4.1.0</awaitility.version>
@@ -311,7 +311,7 @@ under the License.
<exclude>**/target/**</exclude>
<exclude>apache-maven-3.2.5/**</exclude>
<!-- PyCharm -->
- <exclude>**/.idea/**</exclude>
+ <exclude>**/.idea/**</exclude>
</excludes>
</configuration>
</plugin>
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a930d09..3a52f82 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -18,9 +18,12 @@
package org.apache.flink.kubernetes.operator;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
-import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
@@ -41,7 +44,7 @@ public class FlinkOperator {
LOG.info("Starting Flink Kubernetes Operator");
- KubernetesClient client = new DefaultKubernetesClient();
+ DefaultKubernetesClient client = new DefaultKubernetesClient();
String namespace = client.getNamespace();
if (namespace == null) {
namespace = "default";
@@ -51,7 +54,18 @@ public class FlinkOperator {
client,
new ConfigurationServiceOverrider(DefaultConfigurationService.instance())
.build());
- operator.register(new FlinkDeploymentController(client, namespace));
+
+ FlinkService flinkService = new FlinkService(client);
+
+ JobStatusObserver observer = new JobStatusObserver(flinkService);
+ JobReconciler jobReconciler = new JobReconciler(client, flinkService);
+ SessionReconciler sessionReconciler = new SessionReconciler(client, flinkService);
+
+ FlinkDeploymentController controller =
+ new FlinkDeploymentController(
+ client, namespace, observer, jobReconciler, sessionReconciler);
+
+ operator.register(controller);
operator.installShutdownHook();
operator.start();
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 94d5b4d..5ab08e5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -18,10 +18,10 @@
package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.operator.controller.observer.JobStatusObserver;
-import org.apache.flink.kubernetes.operator.controller.reconciler.JobReconciler;
-import org.apache.flink.kubernetes.operator.controller.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
+import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
+import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -56,15 +56,21 @@ public class FlinkDeploymentController
private final String operatorNamespace;
- private final JobStatusObserver observer = new JobStatusObserver();
+ private final JobStatusObserver observer;
private final JobReconciler jobReconciler;
private final SessionReconciler sessionReconciler;
- public FlinkDeploymentController(KubernetesClient kubernetesClient, String namespace) {
+ public FlinkDeploymentController(
+ KubernetesClient kubernetesClient,
+ String operatorNamespace,
+ JobStatusObserver observer,
+ JobReconciler jobReconciler,
+ SessionReconciler sessionReconciler) {
this.kubernetesClient = kubernetesClient;
- this.operatorNamespace = namespace;
- this.jobReconciler = new JobReconciler(kubernetesClient);
- this.sessionReconciler = new SessionReconciler(kubernetesClient);
+ this.operatorNamespace = operatorNamespace;
+ this.observer = observer;
+ this.jobReconciler = jobReconciler;
+ this.sessionReconciler = sessionReconciler;
}
@Override
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java b/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
similarity index 91%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
rename to src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e1e6504..115d7f5 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/observer/JobStatusObserver.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -15,16 +15,15 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.controller.observer;
+package org.apache.flink.kubernetes.operator.observer;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.slf4j.Logger;
@@ -34,13 +33,18 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
/** Observes the actual state of the running jobs on the Flink cluster. */
public class JobStatusObserver {
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
+ private final FlinkService flinkService;
+
+ public JobStatusObserver(FlinkService flinkService) {
+ this.flinkService = flinkService;
+ }
+
public boolean observeFlinkJobStatus(FlinkDeployment flinkApp, Configuration effectiveConfig) {
if (flinkApp.getStatus() == null) {
// This is the first run, nothing to observe
@@ -60,10 +64,9 @@ public class JobStatusObserver {
}
LOG.info("Getting job statuses for {}", flinkApp.getMetadata().getName());
FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
- try (ClusterClient<String> clusterClient =
- FlinkUtils.getRestClusterClient(effectiveConfig)) {
+ try {
Collection<JobStatusMessage> clusterJobStatuses =
- clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ flinkService.listJobs(effectiveConfig);
if (clusterJobStatuses.isEmpty()) {
LOG.info("No jobs found on {} yet, retrying...", flinkApp.getMetadata().getName());
return false;
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
similarity index 70%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
rename to src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 2ae431d..1dc7184 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/JobReconciler.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -15,15 +15,9 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.controller.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
@@ -31,7 +25,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -40,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
/**
* Reconciler responsible for handling the job lifecycle according to the desired and current
@@ -51,9 +44,11 @@ public class JobReconciler {
private static final Logger LOG = LoggerFactory.getLogger(JobReconciler.class);
private final KubernetesClient kubernetesClient;
+ private final FlinkService flinkService;
- public JobReconciler(KubernetesClient kubernetesClient) {
+ public JobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
this.kubernetesClient = kubernetesClient;
+ this.flinkService = flinkService;
}
public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -118,24 +113,12 @@ public class JobReconciler {
private void deployFlinkJob(
FlinkDeployment flinkApp, Configuration effectiveConfig, Optional<String> savepoint)
throws Exception {
- LOG.info("Deploying {}", flinkApp.getMetadata().getName());
if (savepoint.isPresent()) {
effectiveConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, savepoint.get());
} else {
effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
}
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ApplicationDeployer deployer =
- new ApplicationClusterDeployer(clusterClientServiceLoader);
-
- final ApplicationConfiguration applicationConfiguration =
- new ApplicationConfiguration(
- flinkApp.getSpec().getJob().getArgs(),
- flinkApp.getSpec().getJob().getEntryClass());
-
- deployer.run(effectiveConfig, applicationConfiguration);
- LOG.info("{} deployed", flinkApp.getMetadata().getName());
+ flinkService.submitApplicationCluster(flinkApp, effectiveConfig);
}
private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -159,43 +142,24 @@ public class JobReconciler {
private Optional<String> suspendJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
LOG.info("Suspending {}", flinkApp.getMetadata().getName());
- JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId());
- return cancelJob(flinkApp, jobID, UpgradeMode.SAVEPOINT, effectiveConfig);
+ return cancelJob(flinkApp, UpgradeMode.SAVEPOINT, effectiveConfig);
}
private Optional<String> cancelJob(FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
LOG.info("Cancelling {}", flinkApp.getMetadata().getName());
UpgradeMode upgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
- JobID jobID = JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId());
- return cancelJob(flinkApp, jobID, upgradeMode, effectiveConfig);
+ return cancelJob(flinkApp, upgradeMode, effectiveConfig);
}
private Optional<String> cancelJob(
- FlinkDeployment flinkApp,
- JobID jobID,
- UpgradeMode upgradeMode,
- Configuration effectiveConfig)
+ FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration effectiveConfig)
throws Exception {
- Optional<String> savepointOpt = Optional.empty();
- try (ClusterClient<String> clusterClient =
- FlinkUtils.getRestClusterClient(effectiveConfig)) {
- switch (upgradeMode) {
- case STATELESS:
- clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
- break;
- case SAVEPOINT:
- String savepoint =
- clusterClient
- .stopWithSavepoint(jobID, false, null)
- .get(1, TimeUnit.MINUTES);
- savepointOpt = Optional.of(savepoint);
- break;
- default:
- throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
- }
- }
- FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
+ Optional<String> savepointOpt =
+ flinkService.cancelJob(
+ JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
+ upgradeMode,
+ effectiveConfig);
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
jobStatus.setState("suspended");
savepointOpt.ifPresent(jobStatus::setSavepointLocation);
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
similarity index 61%
rename from src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
rename to src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 04a0568..374ee24 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/controller/reconciler/SessionReconciler.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -15,16 +15,12 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.controller.reconciler;
+package org.apache.flink.kubernetes.operator.reconciler;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -40,9 +36,11 @@ public class SessionReconciler {
private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
private final KubernetesClient kubernetesClient;
+ private final FlinkService flinkService;
- public SessionReconciler(KubernetesClient kubernetesClient) {
+ public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService) {
this.kubernetesClient = kubernetesClient;
+ this.flinkService = flinkService;
}
public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
@@ -50,7 +48,7 @@ public class SessionReconciler {
if (flinkApp.getStatus() == null) {
flinkApp.setStatus(new FlinkDeploymentStatus());
try {
- deployFlinkSession(flinkApp, effectiveConfig);
+ flinkService.submitSessionCluster(flinkApp, effectiveConfig);
KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
return true;
} catch (Exception e) {
@@ -70,25 +68,9 @@ public class SessionReconciler {
return true;
}
- private void deployFlinkSession(FlinkDeployment flinkApp, Configuration effectiveConfig)
- throws Exception {
- LOG.info("Deploying session cluster {}", flinkApp.getMetadata().getName());
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ClusterClientFactory<String> kubernetesClusterClientFactory =
- clusterClientServiceLoader.getClusterClientFactory(effectiveConfig);
- try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
- kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfig)) {
- kubernetesClusterDescriptor.deploySessionCluster(
- kubernetesClusterClientFactory.getClusterSpecification(effectiveConfig));
- }
- LOG.info("Session cluster {} deployed", flinkApp.getMetadata().getName());
- }
-
private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
- FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
- FlinkUtils.waitForClusterShutdown(kubernetesClient, effectiveConfig);
- deployFlinkSession(flinkApp, effectiveConfig);
+ flinkService.stopSessionCluster(flinkApp, effectiveConfig);
+ flinkService.submitSessionCluster(flinkApp, effectiveConfig);
}
}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
new file mode 100644
index 0000000..d17ce1e
--- /dev/null
+++ b/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Service for submitting and interacting with Flink clusters and jobs. */
+public class FlinkService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
+
+ private final NamespacedKubernetesClient kubernetesClient;
+
+ public FlinkService(NamespacedKubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ public void submitApplicationCluster(FlinkDeployment deployment, Configuration conf)
+ throws Exception {
+ LOG.info("Deploying application cluster {}", deployment.getMetadata().getName());
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ApplicationDeployer deployer =
+ new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+ final ApplicationConfiguration applicationConfiguration =
+ new ApplicationConfiguration(
+ deployment.getSpec().getJob().getArgs(),
+ deployment.getSpec().getJob().getEntryClass());
+
+ deployer.run(conf, applicationConfiguration);
+ LOG.info("Application cluster {} deployed", deployment.getMetadata().getName());
+ }
+
+ public void submitSessionCluster(FlinkDeployment deployment, Configuration conf)
+ throws Exception {
+ LOG.info("Deploying session cluster {}", deployment.getMetadata().getName());
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ClusterClientFactory<String> kubernetesClusterClientFactory =
+ clusterClientServiceLoader.getClusterClientFactory(conf);
+ try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
+ kubernetesClusterDescriptor.deploySessionCluster(
+ kubernetesClusterClientFactory.getClusterSpecification(conf));
+ }
+ LOG.info("Session cluster {} deployed", deployment.getMetadata().getName());
+ }
+
+ public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
+ try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ return clusterClient.listJobs().get(10, TimeUnit.SECONDS);
+ }
+ }
+
+ private ClusterClient<String> getClusterClient(Configuration config) throws Exception {
+ final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
+ final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
+ final int port = config.getInteger(RestOptions.PORT);
+ final String host =
+ config.getString(
+ RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
+ final String restServerAddress = String.format("http://%s:%s", host, port);
+ LOG.debug("Creating RestClusterClient({})", restServerAddress);
+ return new RestClusterClient<>(
+ config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
+ }
+
+ public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, Configuration conf)
+ throws Exception {
+ Optional<String> savepointOpt = Optional.empty();
+ try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
+ switch (upgradeMode) {
+ case STATELESS:
+ clusterClient.cancel(jobID).get(1, TimeUnit.MINUTES);
+ break;
+ case SAVEPOINT:
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(jobID, false, null)
+ .get(1, TimeUnit.MINUTES);
+ savepointOpt = Optional.of(savepoint);
+ break;
+ default:
+ throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
+ }
+ }
+ waitForClusterShutdown(conf);
+ return savepointOpt;
+ }
+
+ public void stopSessionCluster(FlinkDeployment deployment, Configuration conf)
+ throws Exception {
+ FlinkUtils.deleteCluster(deployment, kubernetesClient);
+ waitForClusterShutdown(conf);
+ }
+
+ /** We need this due to the buggy flink kube cluster client behaviour for now. */
+ private void waitForClusterShutdown(Configuration conf) throws Exception {
+ Fabric8FlinkKubeClient flinkKubeClient =
+ new Fabric8FlinkKubeClient(
+ conf, kubernetesClient, Executors.newSingleThreadExecutor());
+ for (int i = 0; i < 60; i++) {
+ if (!flinkKubeClient
+ .getRestEndpoint(conf.get(KubernetesConfigOptions.CLUSTER_ID))
+ .isPresent()) {
+ break;
+ }
+ LOG.info("Waiting for cluster shutdown... ({})", i);
+ Thread.sleep(1000);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 18fd3c1..e9fbac0 100644
--- a/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,8 +17,6 @@
package org.apache.flink.kubernetes.operator.utils;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -26,14 +24,11 @@ import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
-import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
@@ -42,7 +37,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.internal.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +47,6 @@ import java.net.URI;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Iterator;
-import java.util.concurrent.Executors;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -224,20 +217,6 @@ public class FlinkUtils {
}
}
- public static ClusterClient<String> getRestClusterClient(Configuration config)
- throws Exception {
- final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
- final int port = config.getInteger(RestOptions.PORT);
- final String host =
- config.getString(
- RestOptions.ADDRESS, String.format("%s-rest.%s", clusterId, namespace));
- final String restServerAddress = String.format("http://%s:%s", host, port);
- LOG.info("Creating RestClusterClient({})", restServerAddress);
- return new RestClusterClient<>(
- config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
- }
-
public static void deleteCluster(FlinkDeployment flinkApp, KubernetesClient kubernetesClient) {
kubernetesClient
.apps()
@@ -247,24 +226,4 @@ public class FlinkUtils {
.cascading(true)
.delete();
}
-
- /** We need this due to the buggy flink kube cluster client behaviour for now. */
- public static void waitForClusterShutdown(
- KubernetesClient kubernetesClient, Configuration effectiveConfig)
- throws InterruptedException {
- Fabric8FlinkKubeClient flinkKubeClient =
- new Fabric8FlinkKubeClient(
- effectiveConfig,
- (NamespacedKubernetesClient) kubernetesClient,
- Executors.newSingleThreadExecutor());
- for (int i = 0; i < 60; i++) {
- if (!flinkKubeClient
- .getRestEndpoint(effectiveConfig.get(KubernetesConfigOptions.CLUSTER_ID))
- .isPresent()) {
- break;
- }
- LOG.info("Waiting for cluster shutdown... ({})", i);
- Thread.sleep(1000);
- }
- }
}