You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:10 UTC
[flink-kubernetes-operator] 04/05: [FLINK-27445] Create Flink Standalone Service
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 0d05d0ab514387a314179785bb850d41c7dd1e85
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Fri Jun 17 18:14:38 2022 +0100
[FLINK-27445] Create Flink Standalone Service
---
.../flink/kubernetes/operator/FlinkOperator.java | 16 +-
.../operator/config/FlinkConfigBuilder.java | 2 +-
.../operator/crd/spec/FlinkDeploymentSpec.java | 3 +
.../crd/spec/KubernetesDeploymentMode.java | 47 ++
.../observer/deployment/ObserverFactory.java | 20 +-
.../observer/sessionjob/SessionJobObserver.java | 85 +--
.../AbstractFlinkResourceReconciler.java | 34 +-
.../deployment/AbstractJobReconciler.java | 28 +-
.../deployment/ApplicationReconciler.java | 38 +-
.../reconciler/deployment/ReconcilerFactory.java | 12 +-
.../reconciler/deployment/SessionReconciler.java | 31 +-
.../sessionjob/SessionJobReconciler.java | 27 +-
...FlinkService.java => AbstractFlinkService.java} | 507 +++++++-------
.../kubernetes/operator/service/FlinkService.java | 729 +--------------------
.../operator/service/FlinkServiceFactory.java | 66 ++
.../operator/service/NativeFlinkService.java | 178 +++++
.../operator/service/StandaloneFlinkService.java | 163 +++++
.../kubernetes/operator/utils/FlinkUtils.java | 135 ----
.../kubernetes/operator/TestingFlinkService.java | 22 +-
.../operator/TestingFlinkServiceFactory.java | 52 ++
.../TestingFlinkDeploymentController.java | 7 +-
.../sessionjob/SessionJobObserverTest.java | 9 +-
.../deployment/ApplicationReconcilerTest.java | 6 +-
.../sessionjob/SessionJobReconcilerTest.java | 13 +-
...erviceTest.java => NativeFlinkServiceTest.java} | 12 +-
.../service/StandaloneFlinkServiceTest.java | 138 ++++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 5 +
27 files changed, 1157 insertions(+), 1228 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 8933a6c7..4d2f4674 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -37,7 +37,7 @@ import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
@@ -68,7 +68,7 @@ public class FlinkOperator {
private final Operator operator;
private final KubernetesClient client;
- private final FlinkService flinkService;
+ private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final Set<FlinkResourceValidator> validators;
@VisibleForTesting final Set<RegisteredController> registeredControllers = new HashSet<>();
@@ -86,7 +86,7 @@ public class FlinkOperator {
KubernetesClientUtils.getKubernetesClient(
configManager.getOperatorConfiguration(), this.metricGroup);
this.operator = new Operator(client, this::overrideOperatorConfigs);
- this.flinkService = new FlinkService(client, configManager);
+ this.flinkServiceFactory = new FlinkServiceFactory(client, configManager);
this.validators = ValidatorUtils.discoverValidators(configManager);
this.listeners = ListenerUtils.discoverListeners(configManager);
PluginManager pluginManager =
@@ -126,9 +126,10 @@ public class FlinkOperator {
var eventRecorder = EventRecorder.create(client, listeners);
var reconcilerFactory =
new ReconcilerFactory(
- client, flinkService, configManager, eventRecorder, statusRecorder);
+ client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
var observerFactory =
- new ObserverFactory(flinkService, configManager, statusRecorder, eventRecorder);
+ new ObserverFactory(
+ flinkServiceFactory, configManager, statusRecorder, eventRecorder);
var controller =
new FlinkDeploymentController(
@@ -149,9 +150,10 @@ public class FlinkOperator {
client, new MetricManager<>(metricGroup, configManager), listeners);
var reconciler =
new SessionJobReconciler(
- client, flinkService, configManager, eventRecorder, statusRecorder);
+ client, flinkServiceFactory, configManager, eventRecorder, statusRecorder);
var observer =
- new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+ new SessionJobObserver(
+ flinkServiceFactory, configManager, statusRecorder, eventRecorder);
var controller =
new FlinkSessionJobController(
configManager, validators, reconciler, observer, statusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 2dfd9fd9..8941dd51 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -260,7 +260,7 @@ public class FlinkConfigBuilder {
return effectiveConfig;
}
- protected static Configuration buildFrom(
+ public static Configuration buildFrom(
String namespace, String clusterId, FlinkDeploymentSpec spec, Configuration flinkConfig)
throws IOException, URISyntaxException {
return new FlinkConfigBuilder(namespace, clusterId, spec, flinkConfig)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 14f50afa..4b81f1b8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -70,4 +70,7 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
* configContent.
*/
private Map<String, String> logConfiguration;
+
+ /** Deployment mode of the Flink cluster, native or standalone. */
+ private KubernetesDeploymentMode mode;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java
new file mode 100644
index 00000000..5537be61
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/KubernetesDeploymentMode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.crd.spec;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Enum to control Flink deployment mode on Kubernetes. */
+@Experimental
+public enum KubernetesDeploymentMode {
+
+ /**
+ * Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of
+ * Flink
+ */
+ @JsonProperty("native")
+ NATIVE,
+
+ /** Deploys Flink on-top of kubernetes in standalone mode. */
+ @JsonProperty("standalone")
+ STANDALONE;
+
+ public static KubernetesDeploymentMode getDeploymentMode(FlinkDeployment flinkDeployment) {
+ return getDeploymentMode(flinkDeployment.getSpec());
+ }
+
+ public static KubernetesDeploymentMode getDeploymentMode(FlinkDeploymentSpec spec) {
+ return spec.getMode() == null ? KubernetesDeploymentMode.NATIVE : spec.getMode();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index 605c2769..6c2c203d 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -22,28 +22,28 @@ import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-/** The factory to create the observer based ob the {@link FlinkDeployment} mode. */
+/** The factory to create the observer based on the {@link FlinkDeployment} mode. */
public class ObserverFactory {
- private final FlinkService flinkService;
+ private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
private final EventRecorder eventRecorder;
private final Map<Mode, Observer<FlinkDeployment>> observerMap;
public ObserverFactory(
- FlinkService flinkService,
+ FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
StatusRecorder<FlinkDeploymentStatus> statusRecorder,
EventRecorder eventRecorder) {
- this.flinkService = flinkService;
+ this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
this.statusRecorder = statusRecorder;
this.eventRecorder = eventRecorder;
@@ -56,10 +56,16 @@ public class ObserverFactory {
mode -> {
switch (mode) {
case SESSION:
- return new SessionObserver(flinkService, configManager, eventRecorder);
+ return new SessionObserver(
+ flinkServiceFactory.getOrCreate(flinkApp),
+ configManager,
+ eventRecorder);
case APPLICATION:
return new ApplicationObserver(
- flinkService, configManager, statusRecorder, eventRecorder);
+ flinkServiceFactory.getOrCreate(flinkApp),
+ configManager,
+ statusRecorder,
+ eventRecorder);
default:
throw new UnsupportedOperationException(
String.format("Unsupported running mode: %s", mode));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 5ba397c4..c3aeec46 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.observer.context.VoidObserverContext
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -52,52 +53,51 @@ import java.util.stream.Collectors;
public class SessionJobObserver implements Observer<FlinkSessionJob> {
private static final Logger LOG = LoggerFactory.getLogger(SessionJobObserver.class);
+ private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
- private final SavepointObserver<FlinkSessionJobStatus> savepointObserver;
- private final JobStatusObserver<VoidObserverContext> jobStatusObserver;
- private final FlinkService flinkService;
+ private final StatusRecorder<FlinkSessionJobStatus> statusRecorder;
public SessionJobObserver(
- FlinkService flinkService,
+ FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
StatusRecorder<FlinkSessionJobStatus> statusRecorder,
EventRecorder eventRecorder) {
+ this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
this.eventRecorder = eventRecorder;
- this.flinkService = flinkService;
- this.savepointObserver =
- new SavepointObserver<>(flinkService, configManager, statusRecorder, eventRecorder);
- this.jobStatusObserver =
- new JobStatusObserver<>(flinkService, eventRecorder) {
- @Override
- protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
-
- @Override
- protected Optional<JobStatusMessage> filterTargetJob(
- JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
- var jobId =
- Preconditions.checkNotNull(
- status.getJobId(),
- "The jobID to be observed should not be null");
- var matchedList =
- clusterJobStatuses.stream()
- .filter(job -> job.getJobId().toHexString().equals(jobId))
- .collect(Collectors.toList());
- Preconditions.checkArgument(
- matchedList.size() <= 1,
- String.format(
- "Expected one job for JobID: %s, but %d founded",
- status.getJobId(), matchedList.size()));
-
- if (matchedList.size() == 0) {
- LOG.info("No job found for JobID: {}", jobId);
- return Optional.empty();
- } else {
- return Optional.of(matchedList.get(0));
- }
- }
- };
+ this.statusRecorder = statusRecorder;
+ }
+
+ private JobStatusObserver<VoidObserverContext> getJobStatusObserver(FlinkService flinkService) {
+ return new JobStatusObserver<>(flinkService, eventRecorder) {
+ @Override
+ protected void onTimeout(VoidObserverContext sessionJobObserverContext) {}
+
+ @Override
+ protected Optional<JobStatusMessage> filterTargetJob(
+ JobStatus status, List<JobStatusMessage> clusterJobStatuses) {
+ var jobId =
+ Preconditions.checkNotNull(
+ status.getJobId(), "The jobID to be observed should not be null");
+ var matchedList =
+ clusterJobStatuses.stream()
+ .filter(job -> job.getJobId().toHexString().equals(jobId))
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ matchedList.size() <= 1,
+ String.format(
+ "Expected one job for JobID: %s, but %d founded",
+ status.getJobId(), matchedList.size()));
+
+ if (matchedList.size() == 0) {
+ LOG.info("No job found for JobID: {}", jobId);
+ return Optional.empty();
+ } else {
+ return Optional.of(matchedList.get(0));
+ }
+ }
+ };
}
@Override
@@ -112,11 +112,13 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
return;
}
+ FlinkService flinkService = flinkServiceFactory.getOrCreate(flinkDepOpt.get());
+ var jobStatusObserver = getJobStatusObserver(flinkService);
var deployedConfig =
configManager.getSessionJobConfig(flinkDepOpt.get(), flinkSessionJob.getSpec());
var reconciliationStatus = flinkSessionJob.getStatus().getReconciliationStatus();
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
- checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig);
+ checkIfAlreadyUpgraded(flinkSessionJob, deployedConfig, flinkService);
if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
return;
}
@@ -127,13 +129,18 @@ public class SessionJobObserver implements Observer<FlinkSessionJob> {
flinkSessionJob, deployedConfig, VoidObserverContext.INSTANCE);
if (jobFound) {
+ SavepointObserver savepointObserver =
+ new SavepointObserver<>(
+ flinkService, configManager, statusRecorder, eventRecorder);
savepointObserver.observeSavepointStatus(flinkSessionJob, deployedConfig);
}
SavepointUtils.resetTriggerIfJobNotRunning(flinkSessionJob, eventRecorder);
}
private void checkIfAlreadyUpgraded(
- FlinkSessionJob flinkSessionJob, Configuration deployedConfig) {
+ FlinkSessionJob flinkSessionJob,
+ Configuration deployedConfig,
+ FlinkService flinkService) {
var uid = flinkSessionJob.getMetadata().getUid();
Collection<JobStatusMessage> jobStatusMessages;
try {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 7758ac6f..3d458432 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -65,7 +65,6 @@ public abstract class AbstractFlinkResourceReconciler<
protected final EventRecorder eventRecorder;
protected final StatusRecorder<STATUS> statusRecorder;
protected final KubernetesClient kubernetesClient;
- protected final FlinkService flinkService;
public static final String MSG_SUSPENDED = "Suspending existing deployment.";
public static final String MSG_SPEC_CHANGED = "Detected spec change, starting reconciliation.";
@@ -74,12 +73,10 @@ public abstract class AbstractFlinkResourceReconciler<
public AbstractFlinkResourceReconciler(
KubernetesClient kubernetesClient,
- FlinkService flinkService,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<STATUS> statusRecorder) {
this.kubernetesClient = kubernetesClient;
- this.flinkService = flinkService;
this.configManager = configManager;
this.eventRecorder = eventRecorder;
this.statusRecorder = statusRecorder;
@@ -112,6 +109,7 @@ public abstract class AbstractFlinkResourceReconciler<
cr,
spec,
status,
+ ctx,
deployConfig,
Optional.ofNullable(spec.getJob()).map(JobSpec::getInitialSavepointPath),
false);
@@ -128,7 +126,7 @@ public abstract class AbstractFlinkResourceReconciler<
reconciliationStatus.getState() == ReconciliationState.UPGRADING
|| !currentDeploySpec.equals(lastReconciledSpec);
var observeConfig = getObserveConfig(cr, ctx);
-
+ var flinkService = getFlinkService(cr, ctx);
if (specChanged) {
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
return;
@@ -142,8 +140,8 @@ public abstract class AbstractFlinkResourceReconciler<
EventRecorder.Component.JobManagerDeployment,
MSG_SPEC_CHANGED);
}
- reconcileSpecChange(cr, observeConfig, deployConfig);
- } else if (shouldRollBack(cr, observeConfig)) {
+ reconcileSpecChange(cr, ctx, observeConfig, deployConfig);
+ } else if (shouldRollBack(cr, observeConfig, flinkService)) {
// Rollbacks are executed in two steps, we initiate it first then return
if (initiateRollBack(status)) {
return;
@@ -156,7 +154,7 @@ public abstract class AbstractFlinkResourceReconciler<
EventRecorder.Component.JobManagerDeployment,
MSG_ROLLBACK);
rollback(cr, ctx, observeConfig);
- } else if (!reconcileOtherChanges(cr, observeConfig)) {
+ } else if (!reconcileOtherChanges(cr, ctx, observeConfig)) {
LOG.info("Resource fully reconciled, nothing to do...");
}
}
@@ -202,7 +200,8 @@ public abstract class AbstractFlinkResourceReconciler<
* @throws Exception Error during spec upgrade.
*/
protected abstract void reconcileSpecChange(
- CR cr, Configuration observeConfig, Configuration deployConfig) throws Exception;
+ CR cr, Context ctx, Configuration observeConfig, Configuration deployConfig)
+ throws Exception;
/**
* Rollback deployed resource to the last stable spec.
@@ -224,8 +223,8 @@ public abstract class AbstractFlinkResourceReconciler<
* @return True if any further reconciliation action was taken.
* @throws Exception Error during reconciliation.
*/
- protected abstract boolean reconcileOtherChanges(CR cr, Configuration observeConfig)
- throws Exception;
+ protected abstract boolean reconcileOtherChanges(
+ CR cr, Context context, Configuration observeConfig) throws Exception;
@Override
public final DeleteControl cleanup(CR resource, Context context) {
@@ -239,6 +238,7 @@ public abstract class AbstractFlinkResourceReconciler<
* @param spec Spec that should be deployed to Kubernetes.
* @param status Status object of the resource
* @param deployConfig Flink conf for the deployment.
+ * @param ctx Reconciliation context.
* @param savepoint Optional savepoint path for applications and session jobs.
* @param requireHaMetadata Flag used by application deployments to validate HA metadata
* @throws Exception Error during deployment.
@@ -247,6 +247,7 @@ public abstract class AbstractFlinkResourceReconciler<
CR relatedResource,
SPEC spec,
STATUS status,
+ Context ctx,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
@@ -261,6 +262,15 @@ public abstract class AbstractFlinkResourceReconciler<
*/
protected abstract DeleteControl cleanupInternal(CR resource, Context context);
+ /**
+ * Get the Flink service related to the resource and context.
+ *
+ * @param resource
+ * @param context
+ * @return
+ */
+ protected abstract FlinkService getFlinkService(CR resource, Context context);
+
/**
* Checks whether the desired spec already matches the currently deployed spec. If they match
* the resource status is updated to reflect successful reconciliation.
@@ -296,7 +306,9 @@ public abstract class AbstractFlinkResourceReconciler<
* @return True if the resource should be rolled back.
*/
private boolean shouldRollBack(
- AbstractFlinkResource<SPEC, STATUS> resource, Configuration configuration) {
+ AbstractFlinkResource<SPEC, STATUS> resource,
+ Configuration configuration,
+ FlinkService flinkService) {
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
if (reconciliationStatus.getState() == ReconciliationState.ROLLING_BACK) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 7f4cbdf1..e3a72a05 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -28,7 +28,6 @@ import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -54,11 +53,10 @@ public abstract class AbstractJobReconciler<
public AbstractJobReconciler(
KubernetesClient kubernetesClient,
- FlinkService flinkService,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<STATUS> statusRecorder) {
- super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+ super(kubernetesClient, configManager, eventRecorder, statusRecorder);
}
@Override
@@ -80,8 +78,8 @@ public abstract class AbstractJobReconciler<
@Override
protected void reconcileSpecChange(
- CR resource, Configuration observeConfig, Configuration deployConfig) throws Exception {
- var deployMeta = resource.getMetadata();
+ CR resource, Context ctx, Configuration observeConfig, Configuration deployConfig)
+ throws Exception {
STATUS status = resource.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
SPEC lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
@@ -107,7 +105,7 @@ public abstract class AbstractJobReconciler<
MSG_SUSPENDED);
// We must record the upgrade mode used to the status later
currentDeploySpec.getJob().setUpgradeMode(availableUpgradeMode.get());
- cancelJob(resource, availableUpgradeMode.get(), observeConfig);
+ cancelJob(resource, ctx, availableUpgradeMode.get(), observeConfig);
if (desiredJobState == JobState.RUNNING) {
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
} else {
@@ -123,6 +121,7 @@ public abstract class AbstractJobReconciler<
resource,
currentDeploySpec,
status,
+ ctx,
deployConfig,
// We decide to enforce HA based on how job was previously suspended
lastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
@@ -174,6 +173,7 @@ public abstract class AbstractJobReconciler<
CR resource,
SPEC spec,
STATUS status,
+ Context ctx,
Configuration deployConfig,
boolean requireHaMetadata)
throws Exception {
@@ -185,11 +185,11 @@ public abstract class AbstractJobReconciler<
.flatMap(s -> Optional.ofNullable(s.getLocation()));
}
- deploy(resource, spec, status, deployConfig, savepointOpt, requireHaMetadata);
+ deploy(resource, spec, status, ctx, deployConfig, savepointOpt, requireHaMetadata);
}
@Override
- protected void rollback(CR resource, Context context, Configuration observeConfig)
+ protected void rollback(CR resource, Context ctx, Configuration observeConfig)
throws Exception {
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
@@ -198,6 +198,7 @@ public abstract class AbstractJobReconciler<
cancelJob(
resource,
+ ctx,
upgradeMode == UpgradeMode.STATELESS
? UpgradeMode.STATELESS
: UpgradeMode.LAST_STATE,
@@ -207,16 +208,18 @@ public abstract class AbstractJobReconciler<
resource,
rollbackSpec,
resource.getStatus(),
- getDeployConfig(resource.getMetadata(), rollbackSpec, context),
+ ctx,
+ getDeployConfig(resource.getMetadata(), rollbackSpec, ctx),
upgradeMode != UpgradeMode.STATELESS);
reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
}
@Override
- public boolean reconcileOtherChanges(CR resource, Configuration observeConfig)
+ public boolean reconcileOtherChanges(CR resource, Context context, Configuration observeConfig)
throws Exception {
- return SavepointUtils.triggerSavepointIfNeeded(flinkService, resource, observeConfig);
+ return SavepointUtils.triggerSavepointIfNeeded(
+ getFlinkService(resource, context), resource, observeConfig);
}
/**
@@ -228,5 +231,6 @@ public abstract class AbstractJobReconciler<
* @throws Exception Error during cancellation.
*/
protected abstract void cancelJob(
- CR resource, UpgradeMode upgradeMode, Configuration observeConfig) throws Exception;
+ CR resource, Context ctx, UpgradeMode upgradeMode, Configuration observeConfig)
+ throws Exception;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index df0efe37..c03ddc98 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -33,6 +33,7 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
+import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -53,6 +54,7 @@ public class ApplicationReconciler
extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
+ protected final FlinkService flinkService;
public ApplicationReconciler(
KubernetesClient kubernetesClient,
@@ -60,7 +62,13 @@ public class ApplicationReconciler
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
- super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+ super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+ this.flinkService = flinkService;
+ }
+
+ @Override
+ protected FlinkService getFlinkService(FlinkDeployment resource, Context context) {
+ return flinkService;
}
@Override
@@ -118,6 +126,7 @@ public class ApplicationReconciler
FlinkDeployment relatedResource,
FlinkDeploymentSpec spec,
FlinkDeploymentStatus status,
+ Context ctx,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
@@ -138,13 +147,7 @@ public class ApplicationReconciler
}
LOG.info("Deleting deployment with terminated application before new deployment");
flinkService.deleteClusterDeployment(relatedResource.getMetadata(), status, true);
- FlinkUtils.waitForClusterShutdown(
- kubernetesClient,
- deployConfig,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
+ flinkService.waitForClusterShutdown(deployConfig);
}
eventRecorder.triggerEvent(
relatedResource,
@@ -162,7 +165,10 @@ public class ApplicationReconciler
@Override
protected void cancelJob(
- FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration observeConfig)
+ FlinkDeployment deployment,
+ Context ctx,
+ UpgradeMode upgradeMode,
+ Configuration observeConfig)
throws Exception {
flinkService.cancelJob(deployment, upgradeMode, observeConfig);
}
@@ -186,24 +192,24 @@ public class ApplicationReconciler
}
@Override
- public boolean reconcileOtherChanges(FlinkDeployment deployment, Configuration observeConfig)
- throws Exception {
- if (super.reconcileOtherChanges(deployment, observeConfig)) {
+ public boolean reconcileOtherChanges(
+ FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
+ if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) {
return true;
}
if (shouldRecoverDeployment(observeConfig, deployment)) {
- recoverJmDeployment(deployment, observeConfig);
+ recoverJmDeployment(deployment, ctx, observeConfig);
return true;
}
return false;
}
- private void recoverJmDeployment(FlinkDeployment deployment, Configuration observeConfig)
- throws Exception {
+ private void recoverJmDeployment(
+ FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
LOG.info("Missing Flink Cluster deployment, trying to recover...");
FlinkDeploymentSpec specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
- restoreJob(deployment, specToRecover, deployment.getStatus(), observeConfig, true);
+ restoreJob(deployment, specToRecover, deployment.getStatus(), ctx, observeConfig, true);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 2e556fd1..419e3e41 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -22,7 +22,7 @@ import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class ReconcilerFactory {
private final KubernetesClient kubernetesClient;
- private final FlinkService flinkService;
+ private final FlinkServiceFactory flinkServiceFactory;
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
private final StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder;
@@ -43,12 +43,12 @@ public class ReconcilerFactory {
public ReconcilerFactory(
KubernetesClient kubernetesClient,
- FlinkService flinkService,
+ FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder) {
this.kubernetesClient = kubernetesClient;
- this.flinkService = flinkService;
+ this.flinkServiceFactory = flinkServiceFactory;
this.configManager = configManager;
this.eventRecorder = eventRecorder;
this.deploymentStatusRecorder = deploymentStatusRecorder;
@@ -63,14 +63,14 @@ public class ReconcilerFactory {
case SESSION:
return new SessionReconciler(
kubernetesClient,
- flinkService,
+ flinkServiceFactory.getOrCreate(flinkApp),
configManager,
eventRecorder,
deploymentStatusRecorder);
case APPLICATION:
return new ApplicationReconciler(
kubernetesClient,
- flinkService,
+ flinkServiceFactory.getOrCreate(flinkApp),
configManager,
eventRecorder,
deploymentStatusRecorder);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 791db2fe..527cb8b5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -29,7 +29,6 @@ import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -52,6 +51,8 @@ public class SessionReconciler
extends AbstractFlinkResourceReconciler<
FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
+ protected final FlinkService flinkService;
+
private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
public SessionReconciler(
@@ -60,7 +61,13 @@ public class SessionReconciler
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
- super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+ super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+ this.flinkService = flinkService;
+ }
+
+ @Override
+ protected FlinkService getFlinkService(FlinkDeployment resource, Context context) {
+ return flinkService;
}
@Override
@@ -82,7 +89,10 @@ public class SessionReconciler
@Override
protected void reconcileSpecChange(
- FlinkDeployment deployment, Configuration observeConfig, Configuration deployConfig)
+ FlinkDeployment deployment,
+ Context ctx,
+ Configuration observeConfig,
+ Configuration deployConfig)
throws Exception {
deleteSessionCluster(deployment, observeConfig);
@@ -94,6 +104,7 @@ public class SessionReconciler
deployment,
deployment.getSpec(),
deployment.getStatus(),
+ ctx,
deployConfig,
Optional.empty(),
false);
@@ -103,13 +114,7 @@ public class SessionReconciler
private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), false);
- FlinkUtils.waitForClusterShutdown(
- kubernetesClient,
- effectiveConfig,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
+ flinkService.waitForClusterShutdown(effectiveConfig);
}
@Override
@@ -117,6 +122,7 @@ public class SessionReconciler
FlinkDeployment cr,
FlinkDeploymentSpec spec,
FlinkDeploymentStatus status,
+ Context ctx,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
@@ -141,6 +147,7 @@ public class SessionReconciler
deployment,
rollbackSpec,
deployment.getStatus(),
+ ctx,
rollbackConfig,
Optional.empty(),
false);
@@ -149,8 +156,8 @@ public class SessionReconciler
}
@Override
- public boolean reconcileOtherChanges(FlinkDeployment flinkApp, Configuration observeConfig)
- throws Exception {
+ public boolean reconcileOtherChanges(
+ FlinkDeployment flinkApp, Context ctx, Configuration observeConfig) throws Exception {
if (shouldRecoverDeployment(observeConfig, flinkApp)) {
recoverSession(flinkApp, observeConfig);
return true;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index e04d3bd1..f0dd2520 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -45,14 +46,27 @@ public class SessionJobReconciler
extends AbstractJobReconciler<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> {
private static final Logger LOG = LoggerFactory.getLogger(SessionJobReconciler.class);
+ private final FlinkServiceFactory flinkServiceFactory;
public SessionJobReconciler(
KubernetesClient kubernetesClient,
- FlinkService flinkService,
+ FlinkServiceFactory flinkServiceFactory,
FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<FlinkSessionJobStatus> statusRecorder) {
- super(kubernetesClient, flinkService, configManager, eventRecorder, statusRecorder);
+ super(kubernetesClient, configManager, eventRecorder, statusRecorder);
+ this.flinkServiceFactory = flinkServiceFactory;
+ }
+
+ @Override
+ protected FlinkService getFlinkService(FlinkSessionJob resource, Context context) {
+ Optional<FlinkDeployment> deploymentOpt =
+ context.getSecondaryResource(FlinkDeployment.class);
+
+ if (sessionClusterReady(deploymentOpt)) {
+ return flinkServiceFactory.getOrCreate(deploymentOpt.get());
+ }
+ return null;
}
@Override
@@ -84,10 +98,12 @@ public class SessionJobReconciler
FlinkSessionJob cr,
FlinkSessionJobSpec sessionJobSpec,
FlinkSessionJobStatus status,
+ Context ctx,
Configuration deployConfig,
Optional<String> savepoint,
boolean requireHaMetadata)
throws Exception {
+ FlinkService flinkService = getFlinkService(cr, ctx);
var jobID =
flinkService.submitJobToSessionCluster(
cr.getMetadata(), sessionJobSpec, deployConfig, savepoint.orElse(null));
@@ -101,8 +117,12 @@ public class SessionJobReconciler
@Override
protected void cancelJob(
- FlinkSessionJob resource, UpgradeMode upgradeMode, Configuration observeConfig)
+ FlinkSessionJob resource,
+ Context ctx,
+ UpgradeMode upgradeMode,
+ Configuration observeConfig)
throws Exception {
+ FlinkService flinkService = getFlinkService(resource, ctx);
flinkService.cancelSessionJob(resource, upgradeMode, observeConfig);
}
@@ -117,6 +137,7 @@ public class SessionJobReconciler
try {
cancelJob(
sessionJob,
+ context,
UpgradeMode.STATELESS,
getObserveConfig(sessionJob, context));
} catch (Exception e) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
similarity index 87%
copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
copy to flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2deccaee..afb7bf27 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -20,13 +20,6 @@ package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
@@ -43,7 +36,6 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
@@ -94,6 +86,7 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
@@ -106,6 +99,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
@@ -123,112 +117,42 @@ import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
-/** Service for submitting and interacting with Flink clusters and jobs. */
-public class FlinkService {
+/**
+ * An abstract {@link FlinkService} containing some common implementations for the native and
+ * standalone Flink Services.
+ */
+public abstract class AbstractFlinkService implements FlinkService {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkService.class);
- private final KubernetesClient kubernetesClient;
- private final ArtifactManager artifactManager;
- private final FlinkConfigManager configManager;
+ protected final KubernetesClient kubernetesClient;
+ protected final FlinkConfigManager configManager;
private final ExecutorService executorService;
+ protected final ArtifactManager artifactManager;
- public FlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+ public AbstractFlinkService(
+ KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
this.kubernetesClient = kubernetesClient;
- this.artifactManager = new ArtifactManager(configManager);
this.configManager = configManager;
+ this.artifactManager = new ArtifactManager(configManager);
this.executorService =
Executors.newFixedThreadPool(
4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
}
- private static List<JobStatusMessage> toJobStatusMessage(
- MultipleJobsDetails multipleJobsDetails) {
- return multipleJobsDetails.getJobs().stream()
- .map(
- details ->
- new JobStatusMessage(
- details.getJobId(),
- details.getJobName(),
- getEffectiveStatus(details),
- details.getStartTime()))
- .collect(Collectors.toList());
- }
-
- @VisibleForTesting
- protected static JobStatus getEffectiveStatus(JobDetails details) {
- int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
- int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
- boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
- JobStatus effectiveStatus = details.getStatus();
- if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
- effectiveStatus = JobStatus.CREATED;
- LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
- }
- return effectiveStatus;
- }
+ protected abstract PodList getJmPodList(String namespace, String clusterId);
+ @Override
public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
- public void submitApplicationCluster(
- JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
- LOG.info(
- "Deploying application cluster{}",
- requireHaMetadata ? " requiring last-state from HA metadata" : "");
- if (FlinkUtils.isKubernetesHAActivated(conf)) {
- final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
- // parallelism) could take effect
- FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
- }
- if (requireHaMetadata) {
- validateHaMetadataExists(conf);
- }
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ApplicationDeployer deployer =
- new ApplicationClusterDeployer(clusterClientServiceLoader);
-
- final ApplicationConfiguration applicationConfiguration =
- new ApplicationConfiguration(
- jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
- jobSpec.getEntryClass());
-
- deployer.run(conf, applicationConfiguration);
- LOG.info("Application cluster successfully deployed");
- }
-
+ @Override
public boolean isHaMetadataAvailable(Configuration conf) {
return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
}
- protected void validateHaMetadataExists(Configuration conf) {
- if (!isHaMetadataAvailable(conf)) {
- throw new DeploymentFailedException(
- "HA metadata not available to restore from last state. "
- + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
- + "Manual restore required.",
- "RestoreFailed");
- }
- }
-
- public void submitSessionCluster(Configuration conf) throws Exception {
- LOG.info("Deploying session cluster");
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ClusterClientFactory<String> kubernetesClusterClientFactory =
- clusterClientServiceLoader.getClusterClientFactory(conf);
- try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
- kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
- kubernetesClusterDescriptor.deploySessionCluster(
- kubernetesClusterClientFactory.getClusterSpecification(conf));
- }
- LOG.info("Session cluster successfully deployed");
- }
-
+ @Override
public JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
@@ -242,108 +166,7 @@ public class FlinkService {
return jobID;
}
- private JarRunResponseBody runJar(
- JobSpec job,
- JobID jobID,
- JarUploadResponseBody response,
- Configuration conf,
- String savepoint) {
- String jarId =
- response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- JarRunHeaders headers = JarRunHeaders.getInstance();
- JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
- parameters.jarIdPathParameter.resolve(jarId);
- JarRunRequestBody runRequestBody =
- new JarRunRequestBody(
- job.getEntryClass(),
- null,
- job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
- job.getParallelism() > 0 ? job.getParallelism() : null,
- jobID,
- job.getAllowNonRestoredState(),
- savepoint,
- conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
- ? RestoreMode.DEFAULT
- : null);
- LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
- return clusterClient
- .sendRequest(headers, parameters, runRequestBody)
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to submit job to session cluster.", e);
- throw new FlinkRuntimeException(e);
- } finally {
- deleteJar(conf, jarId);
- }
- }
-
- private JarUploadResponseBody uploadJar(
- ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
- String targetDir = artifactManager.generateJarDir(objectMeta, spec);
- File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
- Preconditions.checkArgument(
- jarFile.exists(),
- String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
- JarUploadHeaders headers = JarUploadHeaders.getInstance();
- String clusterId = spec.getDeploymentName();
- String namespace = objectMeta.getNamespace();
- int port = conf.getInteger(RestOptions.PORT);
- String host =
- ObjectUtils.firstNonNull(
- configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
- ExternalServiceDecorator.getNamespacedExternalServiceName(
- clusterId, namespace));
- try (RestClient restClient = new RestClient(conf, executorService)) {
- // TODO add method in flink#RestClusterClient to support upload jar.
- return restClient
- .sendRequest(
- host,
- port,
- headers,
- EmptyMessageParameters.getInstance(),
- EmptyRequestBody.getInstance(),
- Collections.singletonList(
- new FileUpload(
- jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } finally {
- LOG.debug("Deleting the jar file {}", jarFile);
- FileUtils.deleteFileOrDirectory(jarFile);
- }
- }
-
- private void deleteJar(Configuration conf, String jarId) {
- LOG.debug("Deleting the jar: {}", jarId);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
- JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
- parameters.jarIdPathParameter.resolve(jarId);
- clusterClient
- .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to delete the jar: {}.", jarId, e);
- }
- }
-
+ @Override
public boolean isJobManagerPortReady(Configuration config) {
final URI uri;
try (ClusterClient<String> clusterClient = getClusterClient(config)) {
@@ -357,11 +180,13 @@ public class FlinkService {
socket.connect(socketAddress, 1000);
socket.close();
return true;
+ } catch (SocketTimeoutException ste) {
} catch (IOException e) {
}
return false;
}
+ @Override
public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
@@ -370,7 +195,7 @@ public class FlinkService {
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
- .thenApply(FlinkService::toJobStatusMessage)
+ .thenApply(AbstractFlinkService::toJobStatusMessage)
.get(
configManager
.getOperatorConfiguration()
@@ -380,6 +205,7 @@ public class FlinkService {
}
}
+ @Override
public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
return clusterClient
@@ -393,22 +219,7 @@ public class FlinkService {
}
}
- @VisibleForTesting
- protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
- final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
- final int port = config.getInteger(RestOptions.PORT);
- final String host =
- ObjectUtils.firstNonNull(
- configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
- ExternalServiceDecorator.getNamespacedExternalServiceName(
- clusterId, namespace));
- final String restServerAddress = String.format("http://%s:%s", host, port);
- LOG.debug("Creating RestClusterClient({})", restServerAddress);
- return new RestClusterClient<>(
- config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
- }
-
+ @Override
public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
throws Exception {
var deploymentStatus = deployment.getStatus();
@@ -504,30 +315,7 @@ public class FlinkService {
}
}
- @VisibleForTesting
- protected void waitForClusterShutdown(Configuration conf) {
- FlinkUtils.waitForClusterShutdown(
- kubernetesClient,
- conf,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
- }
-
- public void deleteClusterDeployment(
- ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
- FlinkUtils.deleteCluster(
- status,
- meta,
- kubernetesClient,
- deleteHaData,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
- }
-
+ @Override
public void cancelSessionJob(
FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
throws Exception {
@@ -536,7 +324,6 @@ public class FlinkService {
var jobIdString = jobStatus.getJobId();
Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
var jobId = JobID.fromHexString(jobIdString);
-
Optional<String> savepointOpt = Optional.empty();
try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
final String clusterId = clusterClient.getClusterId();
@@ -592,7 +379,6 @@ public class FlinkService {
throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
}
}
-
jobStatus.setState(JobStatus.FINISHED.name());
savepointOpt.ifPresent(
location -> {
@@ -601,6 +387,7 @@ public class FlinkService {
});
}
+ @Override
public void triggerSavepoint(
String jobId,
SavepointTriggerType triggerType,
@@ -639,6 +426,7 @@ public class FlinkService {
}
}
+ @Override
public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
try (RestClusterClient<String> clusterClient =
(RestClusterClient<String>) getClusterClient(conf)) {
@@ -675,6 +463,25 @@ public class FlinkService {
}
}
+ @Override
+ public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ clusterClient
+ .sendRequest(
+ SavepointDisposalTriggerHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ new SavepointDisposalRequest(savepointPath))
+ .get(
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkClientTimeout()
+ .getSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
public SavepointFetchResult fetchSavepointInfo(
String triggerId, String jobId, Configuration conf) {
LOG.info("Fetching savepoint result with triggerId: " + triggerId);
@@ -716,23 +523,7 @@ public class FlinkService {
}
}
- public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- clusterClient
- .sendRequest(
- SavepointDisposalTriggerHeaders.getInstance(),
- EmptyMessageParameters.getInstance(),
- new SavepointDisposalRequest(savepointPath))
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .getSeconds(),
- TimeUnit.SECONDS);
- }
- }
-
+ @Override
public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
Map<String, String> runtimeVersion = new HashMap<>();
@@ -762,6 +553,7 @@ public class FlinkService {
return runtimeVersion;
}
+ @Override
public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
final String clusterId;
@@ -770,6 +562,207 @@ public class FlinkService {
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId);
+ return getJmPodList(namespace, clusterId);
+ }
+
+ @Override
+ public void waitForClusterShutdown(Configuration conf) {
+ waitForClusterShutdown(
+ conf.getString(KubernetesConfigOptions.NAMESPACE),
+ conf.getString(KubernetesConfigOptions.CLUSTER_ID),
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkShutdownClusterTimeout()
+ .toSeconds());
+ }
+
+ @VisibleForTesting
+ protected ClusterClient<String> getClusterClient(Configuration conf) throws Exception {
+ final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+ final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+ final int port = conf.getInteger(RestOptions.PORT);
+ final String host =
+ ObjectUtils.firstNonNull(
+ configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
+ ExternalServiceDecorator.getNamespacedExternalServiceName(
+ clusterId, namespace));
+ final String restServerAddress = String.format("http://%s:%s", host, port);
+ LOG.debug("Creating RestClusterClient({})", restServerAddress);
+ return new RestClusterClient<>(
+ conf, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
+ }
+
+ private JarRunResponseBody runJar(
+ JobSpec job,
+ JobID jobID,
+ JarUploadResponseBody response,
+ Configuration conf,
+ String savepoint) {
+ String jarId =
+ response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ JarRunHeaders headers = JarRunHeaders.getInstance();
+ JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ JarRunRequestBody runRequestBody =
+ new JarRunRequestBody(
+ job.getEntryClass(),
+ null,
+ job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
+ job.getParallelism() > 0 ? job.getParallelism() : null,
+ jobID,
+ job.getAllowNonRestoredState(),
+ savepoint,
+ conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
+ ? RestoreMode.DEFAULT
+ : null);
+ LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
+ return clusterClient
+ .sendRequest(headers, parameters, runRequestBody)
+ .get(
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkClientTimeout()
+ .toSeconds(),
+ TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to submit job to session cluster.", e);
+ throw new FlinkRuntimeException(e);
+ } finally {
+ deleteJar(conf, jarId);
+ }
+ }
+
+ private JarUploadResponseBody uploadJar(
+ ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
+ String targetDir = artifactManager.generateJarDir(objectMeta, spec);
+ File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
+ Preconditions.checkArgument(
+ jarFile.exists(),
+ String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
+ JarUploadHeaders headers = JarUploadHeaders.getInstance();
+ String clusterId = spec.getDeploymentName();
+ String namespace = objectMeta.getNamespace();
+ int port = conf.getInteger(RestOptions.PORT);
+ String host =
+ ObjectUtils.firstNonNull(
+ configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
+ ExternalServiceDecorator.getNamespacedExternalServiceName(
+ clusterId, namespace));
+ try (RestClient restClient = new RestClient(conf, executorService)) {
+ // TODO add method in flink#RestClusterClient to support upload jar.
+ return restClient
+ .sendRequest(
+ host,
+ port,
+ headers,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.singletonList(
+ new FileUpload(
+ jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
+ .get(
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkClientTimeout()
+ .toSeconds(),
+ TimeUnit.SECONDS);
+ } finally {
+ LOG.debug("Deleting the jar file {}", jarFile);
+ FileUtils.deleteFileOrDirectory(jarFile);
+ }
+ }
+
+ private void deleteJar(Configuration conf, String jarId) {
+ LOG.debug("Deleting the jar: {}", jarId);
+ try (RestClusterClient<String> clusterClient =
+ (RestClusterClient<String>) getClusterClient(conf)) {
+ JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
+ JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
+ parameters.jarIdPathParameter.resolve(jarId);
+ clusterClient
+ .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
+ .get(
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkClientTimeout()
+ .toSeconds(),
+ TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to delete the jar: {}.", jarId, e);
+ }
+ }
+
+ /** Wait until the FLink cluster has completely shut down. */
+ @VisibleForTesting
+ void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
+
+ boolean jobManagerRunning = true;
+ boolean serviceRunning = true;
+
+ for (int i = 0; i < shutdownTimeout; i++) {
+ if (jobManagerRunning) {
+ PodList jmPodList = getJmPodList(namespace, clusterId);
+
+ if (jmPodList == null || jmPodList.getItems().isEmpty()) {
+ jobManagerRunning = false;
+ }
+ }
+
+ if (serviceRunning) {
+ Service service =
+ kubernetesClient
+ .services()
+ .inNamespace(namespace)
+ .withName(
+ ExternalServiceDecorator.getExternalServiceName(clusterId))
+ .fromServer()
+ .get();
+ if (service == null) {
+ serviceRunning = false;
+ }
+ }
+
+ if (!jobManagerRunning && !serviceRunning) {
+ break;
+ }
+ // log a message waiting to shutdown Flink cluster every 5 seconds.
+ if ((i + 1) % 5 == 0) {
+ LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ LOG.info("Cluster shutdown completed.");
+ }
+
+ private static List<JobStatusMessage> toJobStatusMessage(
+ MultipleJobsDetails multipleJobsDetails) {
+ return multipleJobsDetails.getJobs().stream()
+ .map(
+ details ->
+ new JobStatusMessage(
+ details.getJobId(),
+ details.getJobName(),
+ getEffectiveStatus(details),
+ details.getStartTime()))
+ .collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ protected static JobStatus getEffectiveStatus(JobDetails details) {
+ int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
+ int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
+ boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
+ JobStatus effectiveStatus = details.getStatus();
+ if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
+ effectiveStatus = JobStatus.CREATED;
+ LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
+ }
+ return effectiveStatus;
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index 2deccaee..d7aa1b98 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -17,759 +17,80 @@
package org.apache.flink.kubernetes.operator.service;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.client.cli.ApplicationDeployer;
-import org.apache.flink.client.deployment.ClusterClientFactory;
-import org.apache.flink.client.deployment.ClusterClientServiceLoader;
-import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.deployment.application.ApplicationConfiguration;
-import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.rest.RestClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
-import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
-import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
-import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.FileUpload;
-import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
-import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
-import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.TriggerId;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
-import org.apache.flink.runtime.rest.util.RestConstants;
-import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
-import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarDeleteMessageParameters;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunMessageParameters;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunRequestBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarRunResponseBody;
-import org.apache.flink.runtime.webmonitor.handlers.JarUploadHeaders;
-import org.apache.flink.runtime.webmonitor.handlers.JarUploadResponseBody;
-import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
-import org.apache.commons.lang3.ObjectUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
/** Service for submitting and interacting with Flink clusters and jobs. */
-public class FlinkService {
-
- private static final Logger LOG = LoggerFactory.getLogger(FlinkService.class);
-
- private final KubernetesClient kubernetesClient;
- private final ArtifactManager artifactManager;
- private final FlinkConfigManager configManager;
- private final ExecutorService executorService;
-
- public FlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
- this.kubernetesClient = kubernetesClient;
- this.artifactManager = new ArtifactManager(configManager);
- this.configManager = configManager;
- this.executorService =
- Executors.newFixedThreadPool(
- 4, new ExecutorThreadFactory("Flink-RestClusterClient-IO"));
- }
-
- private static List<JobStatusMessage> toJobStatusMessage(
- MultipleJobsDetails multipleJobsDetails) {
- return multipleJobsDetails.getJobs().stream()
- .map(
- details ->
- new JobStatusMessage(
- details.getJobId(),
- details.getJobName(),
- getEffectiveStatus(details),
- details.getStartTime()))
- .collect(Collectors.toList());
- }
-
- @VisibleForTesting
- protected static JobStatus getEffectiveStatus(JobDetails details) {
- int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
- int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
- boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
- JobStatus effectiveStatus = details.getStatus();
- if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
- effectiveStatus = JobStatus.CREATED;
- LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, effectiveStatus);
- }
- return effectiveStatus;
- }
-
- public KubernetesClient getKubernetesClient() {
- return kubernetesClient;
- }
-
- public void submitApplicationCluster(
- JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
- LOG.info(
- "Deploying application cluster{}",
- requireHaMetadata ? " requiring last-state from HA metadata" : "");
- if (FlinkUtils.isKubernetesHAActivated(conf)) {
- final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
- // parallelism) could take effect
- FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
- }
- if (requireHaMetadata) {
- validateHaMetadataExists(conf);
- }
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ApplicationDeployer deployer =
- new ApplicationClusterDeployer(clusterClientServiceLoader);
+public interface FlinkService {
- final ApplicationConfiguration applicationConfiguration =
- new ApplicationConfiguration(
- jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
- jobSpec.getEntryClass());
+ KubernetesClient getKubernetesClient();
- deployer.run(conf, applicationConfiguration);
- LOG.info("Application cluster successfully deployed");
- }
+ void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)
+ throws Exception;
- public boolean isHaMetadataAvailable(Configuration conf) {
- return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
- }
+ boolean isHaMetadataAvailable(Configuration conf);
- protected void validateHaMetadataExists(Configuration conf) {
- if (!isHaMetadataAvailable(conf)) {
- throw new DeploymentFailedException(
- "HA metadata not available to restore from last state. "
- + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
- + "Manual restore required.",
- "RestoreFailed");
- }
- }
+ void submitSessionCluster(Configuration conf) throws Exception;
- public void submitSessionCluster(Configuration conf) throws Exception {
- LOG.info("Deploying session cluster");
- final ClusterClientServiceLoader clusterClientServiceLoader =
- new DefaultClusterClientServiceLoader();
- final ClusterClientFactory<String> kubernetesClusterClientFactory =
- clusterClientServiceLoader.getClusterClientFactory(conf);
- try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
- kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
- kubernetesClusterDescriptor.deploySessionCluster(
- kubernetesClusterClientFactory.getClusterSpecification(conf));
- }
- LOG.info("Session cluster successfully deployed");
- }
-
- public JobID submitJobToSessionCluster(
+ JobID submitJobToSessionCluster(
ObjectMeta meta,
FlinkSessionJobSpec spec,
Configuration conf,
@Nullable String savepoint)
- throws Exception {
- // we generate jobID in advance to help deduplicate job submission.
- var jobID = FlinkUtils.generateSessionJobFixedJobID(meta);
- runJar(spec.getJob(), jobID, uploadJar(meta, spec, conf), conf, savepoint);
- LOG.info("Submitted job: {} to session cluster.", jobID);
- return jobID;
- }
-
- private JarRunResponseBody runJar(
- JobSpec job,
- JobID jobID,
- JarUploadResponseBody response,
- Configuration conf,
- String savepoint) {
- String jarId =
- response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- JarRunHeaders headers = JarRunHeaders.getInstance();
- JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
- parameters.jarIdPathParameter.resolve(jarId);
- JarRunRequestBody runRequestBody =
- new JarRunRequestBody(
- job.getEntryClass(),
- null,
- job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
- job.getParallelism() > 0 ? job.getParallelism() : null,
- jobID,
- job.getAllowNonRestoredState(),
- savepoint,
- conf.get(FLINK_VERSION).isNewerVersionThan(FlinkVersion.v1_14)
- ? RestoreMode.DEFAULT
- : null);
- LOG.info("Submitting job: {} to session cluster.", jobID.toHexString());
- return clusterClient
- .sendRequest(headers, parameters, runRequestBody)
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to submit job to session cluster.", e);
- throw new FlinkRuntimeException(e);
- } finally {
- deleteJar(conf, jarId);
- }
- }
-
- private JarUploadResponseBody uploadJar(
- ObjectMeta objectMeta, FlinkSessionJobSpec spec, Configuration conf) throws Exception {
- String targetDir = artifactManager.generateJarDir(objectMeta, spec);
- File jarFile = artifactManager.fetch(spec.getJob().getJarURI(), conf, targetDir);
- Preconditions.checkArgument(
- jarFile.exists(),
- String.format("The jar file %s not exists", jarFile.getAbsolutePath()));
- JarUploadHeaders headers = JarUploadHeaders.getInstance();
- String clusterId = spec.getDeploymentName();
- String namespace = objectMeta.getNamespace();
- int port = conf.getInteger(RestOptions.PORT);
- String host =
- ObjectUtils.firstNonNull(
- configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
- ExternalServiceDecorator.getNamespacedExternalServiceName(
- clusterId, namespace));
- try (RestClient restClient = new RestClient(conf, executorService)) {
- // TODO add method in flink#RestClusterClient to support upload jar.
- return restClient
- .sendRequest(
- host,
- port,
- headers,
- EmptyMessageParameters.getInstance(),
- EmptyRequestBody.getInstance(),
- Collections.singletonList(
- new FileUpload(
- jarFile.toPath(), RestConstants.CONTENT_TYPE_JAR)))
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } finally {
- LOG.debug("Deleting the jar file {}", jarFile);
- FileUtils.deleteFileOrDirectory(jarFile);
- }
- }
-
- private void deleteJar(Configuration conf, String jarId) {
- LOG.debug("Deleting the jar: {}", jarId);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- JarDeleteHeaders headers = JarDeleteHeaders.getInstance();
- JarDeleteMessageParameters parameters = headers.getUnresolvedMessageParameters();
- parameters.jarIdPathParameter.resolve(jarId);
- clusterClient
- .sendRequest(headers, parameters, EmptyRequestBody.getInstance())
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Failed to delete the jar: {}.", jarId, e);
- }
- }
-
- public boolean isJobManagerPortReady(Configuration config) {
- final URI uri;
- try (ClusterClient<String> clusterClient = getClusterClient(config)) {
- uri = URI.create(clusterClient.getWebInterfaceURL());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
- Socket socket = new Socket();
- try {
- socket.connect(socketAddress, 1000);
- socket.close();
- return true;
- } catch (IOException e) {
- }
- return false;
- }
-
- public Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception {
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- return clusterClient
- .sendRequest(
- JobsOverviewHeaders.getInstance(),
- EmptyMessageParameters.getInstance(),
- EmptyRequestBody.getInstance())
- .thenApply(FlinkService::toJobStatusMessage)
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- }
- }
-
- public JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception {
- try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
- return clusterClient
- .requestJobResult(jobID)
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .getSeconds(),
- TimeUnit.SECONDS);
- }
- }
+ throws Exception;
- @VisibleForTesting
- protected ClusterClient<String> getClusterClient(Configuration config) throws Exception {
- final String clusterId = config.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = config.get(KubernetesConfigOptions.NAMESPACE);
- final int port = config.getInteger(RestOptions.PORT);
- final String host =
- ObjectUtils.firstNonNull(
- configManager.getOperatorConfiguration().getFlinkServiceHostOverride(),
- ExternalServiceDecorator.getNamespacedExternalServiceName(
- clusterId, namespace));
- final String restServerAddress = String.format("http://%s:%s", host, port);
- LOG.debug("Creating RestClusterClient({})", restServerAddress);
- return new RestClusterClient<>(
- config, clusterId, (c, e) -> new StandaloneClientHAServices(restServerAddress));
- }
+ boolean isJobManagerPortReady(Configuration config);
- public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
- throws Exception {
- var deploymentStatus = deployment.getStatus();
- var jobIdString = deploymentStatus.getJobStatus().getJobId();
- var jobId = jobIdString != null ? JobID.fromHexString(jobIdString) : null;
+ Collection<JobStatusMessage> listJobs(Configuration conf) throws Exception;
- Optional<String> savepointOpt = Optional.empty();
- try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
- var clusterId = clusterClient.getClusterId();
- switch (upgradeMode) {
- case STATELESS:
- if (ReconciliationUtils.isJobRunning(deployment.getStatus())) {
- LOG.info("Job is running, cancelling job.");
- try {
- clusterClient
- .cancel(Preconditions.checkNotNull(jobId))
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkCancelJobTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- LOG.info("Job successfully cancelled.");
- } catch (Exception e) {
- LOG.error("Could not shut down cluster gracefully, deleting...", e);
- }
- }
- deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
- break;
- case SAVEPOINT:
- final String savepointDirectory =
- Preconditions.checkNotNull(
- conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
- final long timeout =
- conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
- .getSeconds();
- try {
- if (ReconciliationUtils.isJobRunning(deploymentStatus)) {
- LOG.info("Suspending job with savepoint.");
- String savepoint =
- clusterClient
- .stopWithSavepoint(
- Preconditions.checkNotNull(jobId),
- false,
- savepointDirectory,
- conf.get(FLINK_VERSION)
- .isNewerVersionThan(
- FlinkVersion.v1_14)
- ? SavepointFormatType.DEFAULT
- : null)
- .get(timeout, TimeUnit.SECONDS);
- savepointOpt = Optional.of(savepoint);
- LOG.info("Job successfully suspended with savepoint {}.", savepoint);
- } else if (ReconciliationUtils.isJobInTerminalState(deploymentStatus)) {
- LOG.info(
- "Job is already in terminal state skipping cancel-with-savepoint operation.");
- } else {
- throw new RuntimeException(
- "Unexpected non-terminal status: " + deploymentStatus);
- }
- } catch (TimeoutException exception) {
- throw new FlinkException(
- String.format(
- "Timed out stopping the job %s in Flink cluster %s with savepoint, "
- + "please configure a larger timeout via '%s'",
- jobId,
- clusterId,
- ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
- exception);
- }
- break;
- case LAST_STATE:
- deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
- break;
- default:
- throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
- }
- }
- deploymentStatus.getJobStatus().setState(JobStatus.FINISHED.name());
- savepointOpt.ifPresent(
- location -> {
- Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE);
- deploymentStatus.getJobStatus().getSavepointInfo().updateLastSavepoint(sp);
- });
+ JobResult requestJobResult(Configuration conf, JobID jobID) throws Exception;
- var shutdownDisabled =
- upgradeMode != UpgradeMode.LAST_STATE
- && FlinkUtils.clusterShutdownDisabled(
- ReconciliationUtils.getDeployedSpec(deployment));
- if (!shutdownDisabled) {
- waitForClusterShutdown(conf);
- deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- }
- }
+ void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+ throws Exception;
- @VisibleForTesting
- protected void waitForClusterShutdown(Configuration conf) {
- FlinkUtils.waitForClusterShutdown(
- kubernetesClient,
- conf,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
- }
+ void deleteClusterDeployment(
+ ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData);
- public void deleteClusterDeployment(
- ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
- FlinkUtils.deleteCluster(
- status,
- meta,
- kubernetesClient,
- deleteHaData,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
- }
+ void cancelSessionJob(FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
+ throws Exception;
- public void cancelSessionJob(
- FlinkSessionJob sessionJob, UpgradeMode upgradeMode, Configuration conf)
- throws Exception {
-
- var jobStatus = sessionJob.getStatus().getJobStatus();
- var jobIdString = jobStatus.getJobId();
- Preconditions.checkNotNull(jobIdString, "The job to be suspend should not be null");
- var jobId = JobID.fromHexString(jobIdString);
-
- Optional<String> savepointOpt = Optional.empty();
- try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
- final String clusterId = clusterClient.getClusterId();
- switch (upgradeMode) {
- case STATELESS:
- LOG.info("Cancelling job.");
- clusterClient
- .cancel(jobId)
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkCancelJobTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
- LOG.info("Job successfully cancelled.");
- break;
- case SAVEPOINT:
- LOG.info("Suspending job with savepoint.");
- final String savepointDirectory =
- Preconditions.checkNotNull(
- conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
- final long timeout =
- conf.get(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT)
- .getSeconds();
- try {
- String savepoint =
- clusterClient
- .stopWithSavepoint(
- jobId,
- false,
- savepointDirectory,
- conf.get(FLINK_VERSION)
- .isNewerVersionThan(
- FlinkVersion.v1_14)
- ? SavepointFormatType.DEFAULT
- : null)
- .get(timeout, TimeUnit.SECONDS);
- savepointOpt = Optional.of(savepoint);
- LOG.info("Job successfully suspended with savepoint {}.", savepoint);
- } catch (TimeoutException exception) {
- throw new FlinkException(
- String.format(
- "Timed out stopping the job %s in Flink cluster %s with savepoint, "
- + "please configure a larger timeout via '%s'",
- jobId,
- clusterId,
- ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
- exception);
- }
- break;
- case LAST_STATE:
- default:
- throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
- }
- }
-
- jobStatus.setState(JobStatus.FINISHED.name());
- savepointOpt.ifPresent(
- location -> {
- Savepoint sp = Savepoint.of(location, SavepointTriggerType.UPGRADE);
- jobStatus.getSavepointInfo().updateLastSavepoint(sp);
- });
- }
-
- public void triggerSavepoint(
+ void triggerSavepoint(
String jobId,
SavepointTriggerType triggerType,
org.apache.flink.kubernetes.operator.crd.status.SavepointInfo savepointInfo,
Configuration conf)
- throws Exception {
- LOG.info("Triggering new savepoint");
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- SavepointTriggerHeaders savepointTriggerHeaders = SavepointTriggerHeaders.getInstance();
- SavepointTriggerMessageParameters savepointTriggerMessageParameters =
- savepointTriggerHeaders.getUnresolvedMessageParameters();
- savepointTriggerMessageParameters.jobID.resolve(JobID.fromHexString(jobId));
-
- final String savepointDirectory =
- Preconditions.checkNotNull(conf.get(CheckpointingOptions.SAVEPOINT_DIRECTORY));
- final long timeout =
- configManager.getOperatorConfiguration().getFlinkClientTimeout().getSeconds();
- TriggerResponse response =
- clusterClient
- .sendRequest(
- savepointTriggerHeaders,
- savepointTriggerMessageParameters,
- new SavepointTriggerRequestBody(
- savepointDirectory,
- false,
- conf.get(FLINK_VERSION)
- .isNewerVersionThan(FlinkVersion.v1_14)
- ? SavepointFormatType.DEFAULT
- : null,
- null))
- .get(timeout, TimeUnit.SECONDS);
- LOG.info("Savepoint successfully triggered: " + response.getTriggerId().toHexString());
-
- savepointInfo.setTrigger(response.getTriggerId().toHexString(), triggerType);
- }
- }
-
- public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception {
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
-
- var headers = CustomCheckpointingStatisticsHeaders.getInstance();
- var params = headers.getUnresolvedMessageParameters();
- params.jobPathParameter.resolve(jobId);
-
- CompletableFuture<CheckpointHistoryWrapper> response =
- clusterClient.sendRequest(headers, params, EmptyRequestBody.getInstance());
-
- var checkpoints =
- response.get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .getSeconds(),
- TimeUnit.SECONDS);
-
- var latestCheckpointOpt = checkpoints.getLatestCheckpointPath();
-
- if (latestCheckpointOpt.isPresent()
- && latestCheckpointOpt
- .get()
- .equals(
- NonPersistentMetadataCheckpointStorageLocation
- .EXTERNAL_POINTER)) {
- throw new DeploymentFailedException(
- "Latest checkpoint not externally addressable, manual recovery required.",
- "CheckpointNotFound");
- }
- return latestCheckpointOpt.map(
- pointer -> Savepoint.of(pointer, SavepointTriggerType.UNKNOWN));
- }
- }
-
- public SavepointFetchResult fetchSavepointInfo(
- String triggerId, String jobId, Configuration conf) {
- LOG.info("Fetching savepoint result with triggerId: " + triggerId);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- SavepointStatusHeaders savepointStatusHeaders = SavepointStatusHeaders.getInstance();
- SavepointStatusMessageParameters savepointStatusMessageParameters =
- savepointStatusHeaders.getUnresolvedMessageParameters();
- savepointStatusMessageParameters.jobIdPathParameter.resolve(JobID.fromHexString(jobId));
- savepointStatusMessageParameters.triggerIdPathParameter.resolve(
- TriggerId.fromHexString(triggerId));
- CompletableFuture<AsynchronousOperationResult<SavepointInfo>> response =
- clusterClient.sendRequest(
- savepointStatusHeaders,
- savepointStatusMessageParameters,
- EmptyRequestBody.getInstance());
-
- if (response.get() == null || response.get().resource() == null) {
- return SavepointFetchResult.pending();
- }
-
- if (response.get().resource().getLocation() == null) {
- if (response.get().resource().getFailureCause() != null) {
- LOG.error(
- "Failure occurred while fetching the savepoint result",
- response.get().resource().getFailureCause());
- return SavepointFetchResult.error(
- response.get().resource().getFailureCause().toString());
- } else {
- return SavepointFetchResult.pending();
- }
- }
- String location = response.get().resource().getLocation();
- LOG.info("Savepoint result: {}", location);
- return SavepointFetchResult.completed(location);
- } catch (Exception e) {
- LOG.error("Exception while fetching the savepoint result", e);
- return SavepointFetchResult.error(e.getMessage());
- }
- }
+ throws Exception;
- public void disposeSavepoint(String savepointPath, Configuration conf) throws Exception {
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
- clusterClient
- .sendRequest(
- SavepointDisposalTriggerHeaders.getInstance(),
- EmptyMessageParameters.getInstance(),
- new SavepointDisposalRequest(savepointPath))
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .getSeconds(),
- TimeUnit.SECONDS);
- }
- }
+ Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) throws Exception;
- public Map<String, String> getClusterInfo(Configuration conf) throws Exception {
- Map<String, String> runtimeVersion = new HashMap<>();
+ SavepointFetchResult fetchSavepointInfo(String triggerId, String jobId, Configuration conf);
- try (RestClusterClient<String> clusterClient =
- (RestClusterClient<String>) getClusterClient(conf)) {
+ void disposeSavepoint(String savepointPath, Configuration conf) throws Exception;
- CustomDashboardConfiguration dashboardConfiguration =
- clusterClient
- .sendRequest(
- CustomDashboardConfigurationHeaders.getInstance(),
- EmptyMessageParameters.getInstance(),
- EmptyRequestBody.getInstance())
- .get(
- configManager
- .getOperatorConfiguration()
- .getFlinkClientTimeout()
- .toSeconds(),
- TimeUnit.SECONDS);
+ Map<String, String> getClusterInfo(Configuration conf) throws Exception;
- runtimeVersion.put(
- DashboardConfiguration.FIELD_NAME_FLINK_VERSION,
- dashboardConfiguration.getFlinkVersion());
- runtimeVersion.put(
- DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
- dashboardConfiguration.getFlinkRevision());
- }
- return runtimeVersion;
- }
+ PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
- public PodList getJmPodList(FlinkDeployment deployment, Configuration conf) {
- final String namespace = conf.getString(KubernetesConfigOptions.NAMESPACE);
- final String clusterId;
- try (ClusterClient<String> clusterClient = getClusterClient(conf)) {
- clusterId = clusterClient.getClusterId();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- return FlinkUtils.getJmPodList(kubernetesClient, namespace, clusterId);
- }
+ void waitForClusterShutdown(Configuration conf);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
new file mode 100644
index 00000000..761f1e68
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** The factory to create the FlinkService based on the {@link FlinkDeployment} mode. */
+public class FlinkServiceFactory {
+
+ private final KubernetesClient kubernetesClient;
+ private final FlinkConfigManager configManager;
+ private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
+
+ public FlinkServiceFactory(
+ KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+ this.kubernetesClient = kubernetesClient;
+ this.configManager = configManager;
+ this.serviceMap = new ConcurrentHashMap<>();
+ }
+
+ public FlinkService getOrCreate(KubernetesDeploymentMode deploymentMode) {
+ return serviceMap.computeIfAbsent(
+ deploymentMode,
+ mode -> {
+ switch (mode) {
+ case NATIVE:
+ return new NativeFlinkService(kubernetesClient, configManager);
+ case STANDALONE:
+ return new StandaloneFlinkService(kubernetesClient, configManager);
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported deployment mode: %s", mode));
+ }
+ });
+ }
+
+ public FlinkService getOrCreate(FlinkDeployment deployment) {
+ return getOrCreate(getDeploymentMode(deployment));
+ }
+
+ private KubernetesDeploymentMode getDeploymentMode(FlinkDeployment deployment) {
+ return KubernetesDeploymentMode.getDeploymentMode(deployment);
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
new file mode 100644
index 00000000..60b5ee3b
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.cli.ApplicationDeployer;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Native Kubernetes Flink
+ * clusters and jobs.
+ */
+public class NativeFlinkService extends AbstractFlinkService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NativeFlinkService.class);
+
+ public NativeFlinkService(KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+ super(kubernetesClient, configManager);
+ }
+
+ @Override
+ public void submitApplicationCluster(
+ JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+ LOG.info(
+ "Deploying application cluster{}",
+ requireHaMetadata ? " requiring last-state from HA metadata" : "");
+ if (FlinkUtils.isKubernetesHAActivated(conf)) {
+ final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+ final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+ // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+ // parallelism) could take effect
+ FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+ }
+ if (requireHaMetadata) {
+ validateHaMetadataExists(conf);
+ }
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ApplicationDeployer deployer =
+ new ApplicationClusterDeployer(clusterClientServiceLoader);
+
+ final ApplicationConfiguration applicationConfiguration =
+ new ApplicationConfiguration(
+ jobSpec.getArgs() != null ? jobSpec.getArgs() : new String[0],
+ jobSpec.getEntryClass());
+
+ deployer.run(conf, applicationConfiguration);
+ LOG.info("Application cluster successfully deployed");
+ }
+
+ private void validateHaMetadataExists(Configuration conf) {
+ if (!isHaMetadataAvailable(conf)) {
+ throw new DeploymentFailedException(
+ "HA metadata not available to restore from last state. "
+ + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ + "Manual restore required.",
+ "RestoreFailed");
+ }
+ }
+
+ @Override
+ public void submitSessionCluster(Configuration conf) throws Exception {
+ LOG.info("Deploying session cluster");
+ final ClusterClientServiceLoader clusterClientServiceLoader =
+ new DefaultClusterClientServiceLoader();
+ final ClusterClientFactory<String> kubernetesClusterClientFactory =
+ clusterClientServiceLoader.getClusterClientFactory(conf);
+ try (final ClusterDescriptor<String> kubernetesClusterDescriptor =
+ kubernetesClusterClientFactory.createClusterDescriptor(conf)) {
+ kubernetesClusterDescriptor.deploySessionCluster(
+ kubernetesClusterClientFactory.getClusterSpecification(conf));
+ }
+ LOG.info("Session cluster successfully deployed");
+ }
+
+ @Override
+ public void deleteClusterDeployment(
+ ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
+ deleteCluster(
+ status,
+ meta,
+ deleteHaData,
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkShutdownClusterTimeout()
+ .toSeconds());
+ }
+
+ @Override
+ protected PodList getJmPodList(String namespace, String clusterId) {
+ return kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
+ .list();
+ }
+
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param status Deployment status object
+ * @param meta ObjectMeta of the deployment
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
+ * @param shutdownTimeout maximum time allowed for cluster shutdown
+ */
+ private void deleteCluster(
+ FlinkDeploymentStatus status,
+ ObjectMeta meta,
+ boolean deleteHaConfigmaps,
+ long shutdownTimeout) {
+
+ String namespace = meta.getNamespace();
+ String clusterId = meta.getName();
+
+ LOG.info(
+ "Deleting JobManager deployment {}.",
+ deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(KubernetesUtils.getDeploymentName(clusterId))
+ .cascading(true)
+ .delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
+ waitForClusterShutdown(namespace, clusterId, shutdownTimeout);
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .delete();
+ }
+ status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ status.getJobStatus().setState(JobStatus.FINISHED.name());
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
new file mode 100644
index 00000000..5a0b85eb
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.standalone.KubernetesStandaloneClusterDescriptor;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+
+/**
+ * Implementation of {@link FlinkService} submitting and interacting with Standalone Kubernetes
+ * Flink clusters and jobs.
+ */
+public class StandaloneFlinkService extends AbstractFlinkService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandaloneFlinkService.class);
+
+ public StandaloneFlinkService(
+ KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
+ super(kubernetesClient, configManager);
+ }
+
+ @Override
+ public void submitApplicationCluster(
+ JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+ LOG.info("Deploying application cluster");
+ // TODO some HA stuff?
+ submitClusterInternal(conf);
+ LOG.info("Application cluster successfully deployed");
+ }
+
+ @Override
+ public void submitSessionCluster(Configuration conf) throws Exception {
+ LOG.info("Deploying session cluster");
+ // TODO some HA stuff?
+ submitClusterInternal(conf);
+ LOG.info("Session cluster successfully deployed");
+ }
+
+ @Override
+ public void deleteClusterDeployment(
+ ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
+ deleteClusterInternal(meta, deleteHaData);
+ }
+
+ @Override
+ protected PodList getJmPodList(String namespace, String clusterId) {
+ return kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(StandaloneKubernetesUtils.getJobManagerSelectors(clusterId))
+ .list();
+ }
+
+ @VisibleForTesting
+ protected FlinkStandaloneKubeClient createNamespacedKubeClient(
+ Configuration configuration, String namespace) {
+ final int poolSize =
+ configuration.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
+
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ poolSize,
+ new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
+
+ return new Fabric8FlinkStandaloneKubeClient(
+ configuration,
+ Fabric8FlinkStandaloneKubeClient.createNamespacedKubeClient(namespace),
+ executorService);
+ }
+
+ private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException {
+ final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+
+ FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace);
+ try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor =
+ new KubernetesStandaloneClusterDescriptor(conf, client)) {
+ kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+ }
+ }
+
+ private ClusterSpecification getClusterSpecification(Configuration conf) {
+ return new KubernetesClusterClientFactory().getClusterSpecification(conf);
+ }
+
+ private void deleteClusterInternal(ObjectMeta meta, boolean deleteHaConfigmaps) {
+ final String clusterId = meta.getName();
+ final String namespace = meta.getNamespace();
+
+ LOG.info("Deleting Flink Standalone cluster TM resources");
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .cascading(true)
+ .delete();
+
+ LOG.info("Deleting Flink Standalone cluster JM resources");
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
+ .cascading(true)
+ .delete();
+
+ if (deleteHaConfigmaps) {
+ // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
+ waitForClusterShutdown(
+ namespace,
+ clusterId,
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkShutdownClusterTimeout()
+ .toSeconds());
+ kubernetesClient
+ .configMaps()
+ .inNamespace(namespace)
+ .withLabels(
+ KubernetesUtils.getConfigMapLabels(
+ clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
+ .delete();
+ }
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index c1de80e6..05cb7e03 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -18,18 +18,14 @@
package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
-import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
-import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.util.Preconditions;
@@ -42,8 +38,6 @@ import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +48,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.function.Predicate;
-import static org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
-
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -107,133 +99,6 @@ public class FlinkUtils {
}
}
- /**
- * Delete Flink kubernetes cluster by deleting the kubernetes resources directly. Optionally
- * allows deleting the native kubernetes HA resources as well.
- *
- * @param status Deployment status object
- * @param meta ObjectMeta of the deployment
- * @param kubernetesClient Kubernetes client
- * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata should be removed as well
- * @param shutdownTimeout maximum time allowed for cluster shutdown
- */
- public static void deleteCluster(
- FlinkDeploymentStatus status,
- ObjectMeta meta,
- KubernetesClient kubernetesClient,
- boolean deleteHaConfigmaps,
- long shutdownTimeout) {
-
- String namespace = meta.getNamespace();
- String clusterId = meta.getName();
-
- LOG.info(
- "Deleting JobManager deployment {}.",
- deleteHaConfigmaps ? "and HA metadata" : "while preserving HA metadata");
- kubernetesClient
- .apps()
- .deployments()
- .inNamespace(namespace)
- .withName(KubernetesUtils.getDeploymentName(clusterId))
- .cascading(true)
- .delete();
-
- if (deleteHaConfigmaps) {
- // We need to wait for cluster shutdown otherwise HA configmaps might be recreated
- waitForClusterShutdown(kubernetesClient, namespace, clusterId, shutdownTimeout);
- kubernetesClient
- .configMaps()
- .inNamespace(namespace)
- .withLabels(
- KubernetesUtils.getConfigMapLabels(
- clusterId, LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
- .delete();
- }
- status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- status.getJobStatus().setState(JobStatus.FINISHED.name());
- }
-
- /**
- * Wait until the FLink cluster has completely shut down.
- *
- * @param kubernetesClient Kubernetes client.
- * @param namespace Resource namespace.
- * @param clusterId Flink clusterId.
- * @param shutdownTimeout Max time allowed for shutdown.
- */
- public static void waitForClusterShutdown(
- KubernetesClient kubernetesClient,
- String namespace,
- String clusterId,
- long shutdownTimeout) {
-
- boolean jobManagerRunning = true;
- boolean serviceRunning = true;
-
- for (int i = 0; i < shutdownTimeout; i++) {
- if (jobManagerRunning) {
- PodList jmPodList = getJmPodList(kubernetesClient, namespace, clusterId);
-
- if (jmPodList == null || jmPodList.getItems().isEmpty()) {
- jobManagerRunning = false;
- }
- }
-
- if (serviceRunning) {
- Service service =
- kubernetesClient
- .services()
- .inNamespace(namespace)
- .withName(
- ExternalServiceDecorator.getExternalServiceName(clusterId))
- .fromServer()
- .get();
- if (service == null) {
- serviceRunning = false;
- }
- }
-
- if (!jobManagerRunning && !serviceRunning) {
- break;
- }
- // log a message waiting to shutdown Flink cluster every 5 seconds.
- if ((i + 1) % 5 == 0) {
- LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- LOG.info("Cluster shutdown completed.");
- }
-
- /**
- * Wait until the FLink cluster has completely shut down.
- *
- * @param kubernetesClient Kubernetes client.
- * @param conf Flink configuration.
- * @param shutdownTimeout Max time allowed for shutdown.
- */
- public static void waitForClusterShutdown(
- KubernetesClient kubernetesClient, Configuration conf, long shutdownTimeout) {
- FlinkUtils.waitForClusterShutdown(
- kubernetesClient,
- conf.getString(KubernetesConfigOptions.NAMESPACE),
- conf.getString(KubernetesConfigOptions.CLUSTER_ID),
- shutdownTimeout);
- }
-
- public static PodList getJmPodList(
- KubernetesClient kubernetesClient, String namespace, String clusterId) {
- return kubernetesClient
- .pods()
- .inNamespace(namespace)
- .withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
- .list();
- }
-
public static void deleteJobGraphInKubernetesHA(
String clusterId, String namespace, KubernetesClient kubernetesClient) {
// The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 64a0b6de..2e99c53f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -36,8 +36,9 @@ import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatu
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
+import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -75,7 +76,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
/** Flink service mock for tests. */
-public class TestingFlinkService extends FlinkService {
+public class TestingFlinkService extends AbstractFlinkService {
public static final Map<String, String> CLUSTER_INFO =
Map.of(
@@ -172,6 +173,16 @@ public class TestingFlinkService extends FlinkService {
jobs.add(Tuple2.of(conf.get(SavepointConfigOptions.SAVEPOINT_PATH), jobStatusMessage));
}
+ protected void validateHaMetadataExists(Configuration conf) {
+ if (!isHaMetadataAvailable(conf)) {
+ throw new DeploymentFailedException(
+ "HA metadata not available to restore from last state. "
+ + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ + "Manual restore required.",
+ "RestoreFailed");
+ }
+ }
+
@Override
public boolean isHaMetadataAvailable(Configuration conf) {
return FlinkUtils.isKubernetesHAActivated(conf) && haDataAvailable;
@@ -384,7 +395,7 @@ public class TestingFlinkService extends FlinkService {
}
@Override
- protected void waitForClusterShutdown(Configuration conf) {}
+ public void waitForClusterShutdown(Configuration conf) {}
@Override
public void disposeSavepoint(String savepointPath, Configuration conf) {
@@ -430,6 +441,11 @@ public class TestingFlinkService extends FlinkService {
return podList;
}
+ @Override
+ protected PodList getJmPodList(String namespace, String clusterId) {
+ return podList;
+ }
+
public void setJmPodList(PodList podList) {
this.podList = podList;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java
new file mode 100644
index 00000000..e4e4d5e0
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkServiceFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
+
+import java.lang.module.Configuration;
+
+/** Flink service factory mock for tests. */
+public class TestingFlinkServiceFactory extends FlinkServiceFactory {
+ private final FlinkService flinkService;
+
+ public TestingFlinkServiceFactory() {
+ super(null, null);
+ flinkService = new TestingFlinkService();
+ }
+
+ public TestingFlinkServiceFactory(FlinkService flinkService) {
+ super(null, null);
+ this.flinkService = flinkService;
+ }
+
+ public FlinkService getOrCreate(KubernetesDeploymentMode deploymentMode) {
+ return flinkService;
+ }
+
+ public FlinkService getOrCreate(FlinkDeployment deployment) {
+ return flinkService;
+ }
+
+ public FlinkService getOrCreate(Configuration configuration) {
+ return flinkService;
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index 2def2b4a..c169e1d9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.controller;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -26,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -67,6 +69,7 @@ public class TestingFlinkDeploymentController
FlinkConfigManager configManager,
KubernetesClient kubernetesClient,
TestingFlinkService flinkService) {
+ FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
statusRecorder =
new StatusRecorder<>(
@@ -79,12 +82,12 @@ public class TestingFlinkDeploymentController
ValidatorUtils.discoverValidators(configManager),
new ReconcilerFactory(
kubernetesClient,
- flinkService,
+ flinkServiceFactory,
configManager,
eventRecorder,
statusRecorder),
new ObserverFactory(
- flinkService, configManager, statusRecorder, eventRecorder),
+ flinkServiceFactory, configManager, statusRecorder, eventRecorder),
statusRecorder,
eventRecorder);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index e7270bd7..f801bfbc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -24,12 +24,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
@@ -52,6 +54,7 @@ public class SessionJobObserverTest {
private TestingFlinkService flinkService;
private SessionJobObserver observer;
private SessionJobReconciler reconciler;
+ private FlinkServiceFactory flinkServiceFactory;
@BeforeEach
public void before() {
@@ -59,12 +62,14 @@ public class SessionJobObserverTest {
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
flinkService = new TestingFlinkService();
+ flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
observer =
- new SessionJobObserver(flinkService, configManager, statusRecorder, eventRecorder);
+ new SessionJobObserver(
+ flinkServiceFactory, configManager, statusRecorder, eventRecorder);
reconciler =
new SessionJobReconciler(
kubernetesClient,
- flinkService,
+ flinkServiceFactory,
configManager,
eventRecorder,
statusRecorder);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index b7122159..2bb1f089 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -488,6 +488,8 @@ public class ApplicationReconcilerTest {
@Test
public void testRandomJobResultStorePath() throws Exception {
FlinkDeployment flinkApp = TestUtils.buildApplicationCluster();
+ Context context = flinkService.getContext();
+
final String haStoragePath = "file:///flink-data/ha";
flinkApp.getSpec()
.getFlinkConfiguration()
@@ -500,14 +502,14 @@ public class ApplicationReconcilerTest {
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
+ reconciler.deploy(flinkApp, spec, status, context, deployConfig, Optional.empty(), false);
String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path1.startsWith(haStoragePath));
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- reconciler.deploy(flinkApp, spec, status, deployConfig, Optional.empty(), false);
+ reconciler.deploy(flinkApp, spec, status, context, deployConfig, Optional.empty(), false);
String path2 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path2.startsWith(haStoragePath));
assertNotEquals(path1, path2);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index a28e2f19..998fa34d 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
+import org.apache.flink.kubernetes.operator.TestingFlinkServiceFactory;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -33,6 +34,7 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -69,6 +71,8 @@ public class SessionJobReconcilerTest {
@BeforeEach
public void before() {
+ flinkService = new TestingFlinkService();
+ flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
eventRecorder =
new EventRecorder(null, (r, e) -> {}) {
@Override
@@ -83,14 +87,17 @@ public class SessionJobReconcilerTest {
};
reconciler =
new SessionJobReconciler(
- null, flinkService, configManager, eventRecorder, statusRecoder);
+ null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
kubernetesClient.resource(TestUtils.buildSessionJob()).createOrReplace();
statusRecoder = new TestingStatusRecorder<>();
reconciler =
new SessionJobReconciler(
- null, flinkService, configManager, eventRecorder, statusRecoder);
+ null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
}
+ private FlinkServiceFactory flinkServiceFactory;
+ private FlinkSessionJob sessionJob;
+
@Test
public void testSubmitAndCleanUp() throws Exception {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
@@ -460,7 +467,7 @@ public class SessionJobReconcilerTest {
// Force upgrade when savepoint is in progress.
reconciler =
new SessionJobReconciler(
- null, flinkService, configManager, eventRecorder, statusRecoder);
+ null, flinkServiceFactory, configManager, eventRecorder, statusRecoder);
spSessionJob.getSpec().getJob().setParallelism(100);
reconciler.reconcile(spSessionJob, readyContext);
assertEquals(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
similarity index 98%
rename from flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
rename to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index aebd5749..8e151f04 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/FlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.fail;
/** @link FlinkService unit tests */
@EnableKubernetesMockClient(crud = true)
-public class FlinkServiceTest {
+public class NativeFlinkServiceTest {
KubernetesClient client;
private final Configuration configuration = new Configuration();
private final FlinkConfigManager configManager = new FlinkConfigManager(configuration);
@@ -308,7 +308,7 @@ public class FlinkServiceTest {
Tuple2.of(ExecutionState.RUNNING, 4));
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING,
- FlinkService.getEffectiveStatus(allRunning));
+ AbstractFlinkService.getEffectiveStatus(allRunning));
JobDetails allRunningOrFinished =
getJobDetails(
@@ -317,7 +317,7 @@ public class FlinkServiceTest {
Tuple2.of(ExecutionState.FINISHED, 2));
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING,
- FlinkService.getEffectiveStatus(allRunningOrFinished));
+ AbstractFlinkService.getEffectiveStatus(allRunningOrFinished));
JobDetails allRunningOrScheduled =
getJobDetails(
@@ -326,7 +326,7 @@ public class FlinkServiceTest {
Tuple2.of(ExecutionState.SCHEDULED, 2));
assertEquals(
org.apache.flink.api.common.JobStatus.CREATED,
- FlinkService.getEffectiveStatus(allRunningOrScheduled));
+ AbstractFlinkService.getEffectiveStatus(allRunningOrScheduled));
JobDetails allFinished =
getJobDetails(
@@ -334,7 +334,7 @@ public class FlinkServiceTest {
Tuple2.of(ExecutionState.FINISHED, 4));
assertEquals(
org.apache.flink.api.common.JobStatus.FINISHED,
- FlinkService.getEffectiveStatus(allFinished));
+ AbstractFlinkService.getEffectiveStatus(allFinished));
}
private JobDetails getJobDetails(
@@ -358,7 +358,7 @@ public class FlinkServiceTest {
}
private FlinkService createFlinkService(ClusterClient<String> clusterClient) {
- return new FlinkService(client, new FlinkConfigManager(configuration)) {
+ return new NativeFlinkService(client, new FlinkConfigManager(configuration)) {
@Override
protected ClusterClient<String> getClusterClient(Configuration config) {
return clusterClient;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
new file mode 100644
index 00000000..bbffabbd
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+
+/** @link StandaloneFlinkService unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class StandaloneFlinkServiceTest {
+ KubernetesMockServer mockServer;
+
+ private NamespacedKubernetesClient kubernetesClient;
+ StandaloneFlinkService flinkStandaloneService;
+ Configuration configuration = new Configuration();
+
+ @BeforeEach
+ public void setup() {
+ configuration.set(KubernetesConfigOptions.CLUSTER_ID, TestUtils.TEST_DEPLOYMENT_NAME);
+ configuration.set(KubernetesConfigOptions.NAMESPACE, TestUtils.TEST_NAMESPACE);
+
+ kubernetesClient = mockServer.createClient().inAnyNamespace();
+ flinkStandaloneService =
+ new StandaloneFlinkService(kubernetesClient, new FlinkConfigManager(configuration));
+
+ ExecutorService executorService =
+ Executors.newFixedThreadPool(
+ 1, new ExecutorThreadFactory("flink-kubeclient-io-for-standalone-service"));
+ }
+
+ @Test
+ public void testDeleteClusterDeployment() throws Exception {
+ FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
+ configuration = buildConfig(flinkDeployment, configuration);
+
+ createDeployments();
+
+ List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
+
+ assertEquals(2, deployments.size());
+
+ flinkStandaloneService.deleteClusterDeployment(
+ flinkDeployment.getMetadata(), flinkDeployment.getStatus(), false);
+
+ deployments = kubernetesClient.apps().deployments().list().getItems();
+
+ assertEquals(0, deployments.size());
+ }
+
+ @Test
+ public void testDeleteClusterDeploymentWithHADelete() throws Exception {
+ FlinkDeployment flinkDeployment = TestUtils.buildSessionCluster();
+ configuration = buildConfig(flinkDeployment, configuration);
+
+ createDeployments();
+
+ List<Deployment> deployments = kubernetesClient.apps().deployments().list().getItems();
+ assertEquals(2, deployments.size());
+
+ flinkStandaloneService.deleteClusterDeployment(
+ flinkDeployment.getMetadata(), flinkDeployment.getStatus(), true);
+
+ deployments = kubernetesClient.apps().deployments().list().getItems();
+
+ assertEquals(0, deployments.size());
+ }
+
+ private Configuration buildConfig(FlinkDeployment flinkDeployment, Configuration configuration)
+ throws Exception {
+ return configuration =
+ FlinkConfigBuilder.buildFrom(
+ flinkDeployment.getMetadata().getNamespace(),
+ flinkDeployment.getMetadata().getName(),
+ flinkDeployment.getSpec(),
+ configuration);
+ }
+
+ private void createDeployments() {
+ Deployment jmDeployment = new Deployment();
+ ObjectMeta jmMetadata = new ObjectMeta();
+ jmMetadata.setName(
+ StandaloneKubernetesUtils.getJobManagerDeploymentName(
+ TestUtils.TEST_DEPLOYMENT_NAME));
+ jmDeployment.setMetadata(jmMetadata);
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(TestUtils.TEST_NAMESPACE)
+ .create(jmDeployment);
+
+ Deployment tmDeployment = new Deployment();
+ ObjectMeta tmMetadata = new ObjectMeta();
+ tmMetadata.setName(
+ StandaloneKubernetesUtils.getTaskManagerDeploymentName(
+ TestUtils.TEST_DEPLOYMENT_NAME));
+ tmDeployment.setMetadata(tmMetadata);
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(TestUtils.TEST_NAMESPACE)
+ .create(tmDeployment);
+ }
+}
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 83f3b9e3..86d3a97a 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9047,6 +9047,11 @@ spec:
additionalProperties:
type: string
type: object
+ mode:
+ enum:
+ - native
+ - standalone
+ type: string
job:
properties:
jarURI: