You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:11 UTC
[flink-kubernetes-operator] 05/05: [FLINK-27446] Add standalone mode validation and config building
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 79b169964cd3aa1ea3383b3180a601939fbd3a04
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Tue Jun 21 17:35:08 2022 +0100
[FLINK-27446] Add standalone mode validation and config building
---
.github/workflows/ci.yml | 2 +
docs/content/docs/custom-resource/reference.md | 11 +++
e2e-tests/data/flinkdep-cr.yaml | 1 +
e2e-tests/data/multi-sessionjob.yaml | 2 +
e2e-tests/data/sessionjob-cr.yaml | 1 +
.../operator/config/FlinkConfigBuilder.java | 72 +++++++++++++++++-
.../observer/deployment/ObserverFactory.java | 15 ++--
.../deployment/ApplicationReconciler.java | 3 +-
.../reconciler/deployment/ReconcilerFactory.java | 15 ++--
.../operator/service/AbstractFlinkService.java | 43 ++++++++++-
.../operator/service/FlinkServiceFactory.java | 7 ++
.../operator/service/NativeFlinkService.java | 39 +++-------
.../operator/service/StandaloneFlinkService.java | 36 +++++++--
.../operator/utils/EventSourceUtils.java | 6 +-
.../kubernetes/operator/utils/FlinkUtils.java | 4 +
.../operator/validation/DefaultValidator.java | 36 ++++++++-
.../kubernetes/operator/TestingFlinkService.java | 12 +++
.../operator/config/FlinkConfigBuilderTest.java | 85 ++++++++++++++++++++++
.../TestingFlinkDeploymentController.java | 1 +
.../sessionjob/SessionJobObserverTest.java | 3 +-
.../sessionjob/SessionJobReconcilerTest.java | 2 +-
.../operator/validation/DefaultValidatorTest.java | 69 ++++++++++++++++++
22 files changed, 400 insertions(+), 65 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1dde3421..b34a0d28 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -97,6 +97,7 @@ jobs:
matrix:
version: ["v1_15","v1_14","v1_13"]
namespace: ["default","flink"]
+ mode: ["native", "standalone"]
test:
- test_application_kubernetes_ha.sh
- test_application_operations.sh
@@ -155,6 +156,7 @@ jobs:
run: |
sed -i "s/image: flink:.*/image: ${{ matrix.image }}/" e2e-tests/data/*.yaml
sed -i "s/flinkVersion: .*/flinkVersion: ${{ matrix.version }}/" e2e-tests/data/*.yaml
+ sed -i "s/mode: .*/mode: ${{ matrix.mode }}/" e2e-tests/data/*.yaml
git diff HEAD
echo "Running e2e-tests/$test"
bash e2e-tests/${{ matrix.test }} || exit 1
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 100db9d4..b2399505 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -57,6 +57,7 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| jobManager | org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec | JobManager specs. |
| taskManager | org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec | TaskManager specs. |
| logConfiguration | java.util.Map<java.lang.String,java.lang.String> | Log configuration overrides for the Flink deployment. Format logConfigFileName -> configContent. |
+| mode | org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode | Deployment mode of the Flink cluster, native or standalone. |
### FlinkSessionJobSpec
**Class**: org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec
@@ -131,6 +132,16 @@ This page serves as a full reference for FlinkDeployment custom resource definit
| RUNNING | Job is expected to be processing data. |
| SUSPENDED | Processing is suspended with the intention of continuing later. |
+### KubernetesDeploymentMode
+**Class**: org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode
+
+**Description**: Enum to control Flink deployment mode on Kubernetes.
+
+| Value | Docs |
+| ----- | ---- |
+| NATIVE | Deploys Flink using Flinks native Kubernetes support. Only supported for newer versions of Flink |
+| STANDALONE | Deploys Flink on-top of kubernetes in standalone mode. |
+
### Resource
**Class**: org.apache.flink.kubernetes.operator.crd.spec.Resource
diff --git a/e2e-tests/data/flinkdep-cr.yaml b/e2e-tests/data/flinkdep-cr.yaml
index d48af2be..18e73b27 100644
--- a/e2e-tests/data/flinkdep-cr.yaml
+++ b/e2e-tests/data/flinkdep-cr.yaml
@@ -83,6 +83,7 @@ spec:
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
parallelism: 2
upgradeMode: last-state
+ mode: native
---
apiVersion: v1
diff --git a/e2e-tests/data/multi-sessionjob.yaml b/e2e-tests/data/multi-sessionjob.yaml
index ed5c892f..77991953 100644
--- a/e2e-tests/data/multi-sessionjob.yaml
+++ b/e2e-tests/data/multi-sessionjob.yaml
@@ -65,6 +65,7 @@ spec:
resource:
memory: "1024m"
cpu: 0.25
+ mode: native
---
apiVersion: flink.apache.org/v1beta1
@@ -116,6 +117,7 @@ spec:
resource:
memory: "1024m"
cpu: 0.25
+ mode: native
---
apiVersion: flink.apache.org/v1beta1
diff --git a/e2e-tests/data/sessionjob-cr.yaml b/e2e-tests/data/sessionjob-cr.yaml
index 53b49f0a..1953cdec 100644
--- a/e2e-tests/data/sessionjob-cr.yaml
+++ b/e2e-tests/data/sessionjob-cr.yaml
@@ -65,6 +65,7 @@ spec:
resource:
memory: "1024m"
cpu: 0.5
+ mode: native
---
apiVersion: flink.apache.org/v1beta1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index 8941dd51..706d1108 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.config;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -32,8 +33,12 @@ import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -59,6 +64,8 @@ import static org.apache.flink.configuration.DeploymentOptions.SUBMIT_FAILED_JOB
import static org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
import static org.apache.flink.configuration.WebOptions.CANCEL_ENABLE;
import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
+import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION;
+import static org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION;
import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
@@ -216,30 +223,89 @@ public class FlinkConfigBuilder {
spec.getTaskManager().getPodTemplate(),
effectiveConfig,
false);
+
+ if (spec.getTaskManager().getReplicas() != null
+ && spec.getTaskManager().getReplicas() > 0) {
+ effectiveConfig.set(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+ spec.getTaskManager().getReplicas());
+ }
+ }
+
+ if (spec.getJob() != null
+ && KubernetesDeploymentMode.getDeploymentMode(spec)
+ == KubernetesDeploymentMode.STANDALONE) {
+ if (!effectiveConfig.contains(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)) {
+ effectiveConfig.set(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS,
+ FlinkUtils.getNumTaskManagers(effectiveConfig, getParallelism()));
+ }
}
return this;
}
protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
+ KubernetesDeploymentMode deploymentMode = KubernetesDeploymentMode.getDeploymentMode(spec);
+
if (spec.getJob() != null) {
+ JobSpec jobSpec = spec.getJob();
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
- final URI uri = new URI(spec.getJob().getJarURI());
+ final URI uri = new URI(jobSpec.getJarURI());
effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
- if (spec.getJob().getAllowNonRestoredState() != null) {
+ if (jobSpec.getAllowNonRestoredState() != null) {
effectiveConfig.set(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
- spec.getJob().getAllowNonRestoredState());
+ jobSpec.getAllowNonRestoredState());
+ }
+
+ if (jobSpec.getEntryClass() != null) {
+ effectiveConfig.set(
+ ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
}
} else {
effectiveConfig.set(
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
}
+
+ if (deploymentMode == KubernetesDeploymentMode.STANDALONE) {
+ effectiveConfig.set(DeploymentOptions.TARGET, "remote");
+ effectiveConfig.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ spec.getJob() == null ? SESSION : APPLICATION);
+
+ if (spec.getJob() != null) {
+ effectiveConfig.set(
+ PipelineOptions.CLASSPATHS,
+ Collections.singletonList(getStandaloneJarURI(spec.getJob())));
+ }
+ }
return this;
}
+ private String getStandaloneJarURI(JobSpec jobSpec) throws URISyntaxException {
+ URI uri = new URI(jobSpec.getJarURI());
+
+ // Running an application job through standalone mode doesn't requires file uri scheme and
+ // doesn't accept
+ // local scheme which is used for native so convert here to improve compatibilty at the
+ // operator layer
+ if (uri.getScheme().equals("local")) {
+ uri =
+ new URI(
+ "file",
+ uri.getAuthority() == null ? "" : uri.getAuthority(),
+ uri.getPath(),
+ uri.getQuery(),
+ uri.getFragment());
+ }
+
+ return uri.toASCIIString();
+ }
+
private int getParallelism() {
if (spec.getTaskManager() != null && spec.getTaskManager().getReplicas() != null) {
if (spec.getJob().getParallelism() > 0) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
index 6c2c203d..b41d34b8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ObserverFactory.java
@@ -17,9 +17,11 @@
package org.apache.flink.kubernetes.operator.observer.deployment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -36,7 +38,8 @@ public class ObserverFactory {
private final FlinkConfigManager configManager;
private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
private final EventRecorder eventRecorder;
- private final Map<Mode, Observer<FlinkDeployment>> observerMap;
+ private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Observer<FlinkDeployment>>
+ observerMap;
public ObserverFactory(
FlinkServiceFactory flinkServiceFactory,
@@ -52,9 +55,11 @@ public class ObserverFactory {
public Observer<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
return observerMap.computeIfAbsent(
- Mode.getMode(flinkApp),
- mode -> {
- switch (mode) {
+ Tuple2.of(
+ Mode.getMode(flinkApp),
+ KubernetesDeploymentMode.getDeploymentMode(flinkApp)),
+ modes -> {
+ switch (modes.f0) {
case SESSION:
return new SessionObserver(
flinkServiceFactory.getOrCreate(flinkApp),
@@ -68,7 +73,7 @@ public class ObserverFactory {
eventRecorder);
default:
throw new UnsupportedOperationException(
- String.format("Unsupported running mode: %s", mode));
+ String.format("Unsupported running mode: %s", modes.f0));
}
});
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index c03ddc98..ec0389e2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -33,7 +33,6 @@ import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
-import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -194,7 +193,7 @@ public class ApplicationReconciler
@Override
public boolean reconcileOtherChanges(
FlinkDeployment deployment, Context ctx, Configuration observeConfig) throws Exception {
- if (SavepointUtils.triggerSavepointIfNeeded(flinkService, deployment, observeConfig)) {
+ if (super.reconcileOtherChanges(deployment, ctx, observeConfig)) {
return true;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 419e3e41..2d11f495 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -17,9 +17,11 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
@@ -39,7 +41,8 @@ public class ReconcilerFactory {
private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
private final StatusRecorder<FlinkDeploymentStatus> deploymentStatusRecorder;
- private final Map<Mode, Reconciler<FlinkDeployment>> reconcilerMap;
+ private final Map<Tuple2<Mode, KubernetesDeploymentMode>, Reconciler<FlinkDeployment>>
+ reconcilerMap;
public ReconcilerFactory(
KubernetesClient kubernetesClient,
@@ -57,9 +60,11 @@ public class ReconcilerFactory {
public Reconciler<FlinkDeployment> getOrCreate(FlinkDeployment flinkApp) {
return reconcilerMap.computeIfAbsent(
- Mode.getMode(flinkApp),
- mode -> {
- switch (mode) {
+ Tuple2.of(
+ Mode.getMode(flinkApp),
+ KubernetesDeploymentMode.getDeploymentMode(flinkApp)),
+ modes -> {
+ switch (modes.f0) {
case SESSION:
return new SessionReconciler(
kubernetesClient,
@@ -76,7 +81,7 @@ public class ReconcilerFactory {
deploymentStatusRecorder);
default:
throw new UnsupportedOperationException(
- String.format("Unsupported running mode: %s", mode));
+ String.format("Unsupported running mode: %s", modes.f0));
}
});
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index afb7bf27..c4b32529 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -142,11 +142,33 @@ public abstract class AbstractFlinkService implements FlinkService {
protected abstract PodList getJmPodList(String namespace, String clusterId);
+ protected abstract void deployApplicationCluster(JobSpec jobSpec, Configuration conf)
+ throws Exception;
+
@Override
public KubernetesClient getKubernetesClient() {
return kubernetesClient;
}
+ @Override
+ public void submitApplicationCluster(
+ JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+ LOG.info(
+ "Deploying application cluster{}",
+ requireHaMetadata ? " requiring last-state from HA metadata" : "");
+ if (FlinkUtils.isKubernetesHAActivated(conf)) {
+ final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
+ final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
+ // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
+ // parallelism) could take effect
+ FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
+ }
+ if (requireHaMetadata) {
+ validateHaMetadataExists(conf);
+ }
+ deployApplicationCluster(jobSpec, conf);
+ }
+
@Override
public boolean isHaMetadataAvailable(Configuration conf) {
return FlinkUtils.isHaMetadataAvailable(conf, kubernetesClient);
@@ -219,8 +241,11 @@ public abstract class AbstractFlinkService implements FlinkService {
}
}
- @Override
- public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+ protected void cancelJob(
+ FlinkDeployment deployment,
+ UpgradeMode upgradeMode,
+ Configuration conf,
+ boolean deleteClusterAfterSavepoint)
throws Exception {
var deploymentStatus = deployment.getStatus();
var jobIdString = deploymentStatus.getJobStatus().getJobId();
@@ -290,6 +315,9 @@ public abstract class AbstractFlinkService implements FlinkService {
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.key()),
exception);
}
+ if (deleteClusterAfterSavepoint) {
+ deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, true);
+ }
break;
case LAST_STATE:
deleteClusterDeployment(deployment.getMetadata(), deploymentStatus, false);
@@ -697,7 +725,6 @@ public abstract class AbstractFlinkService implements FlinkService {
/** Wait until the FLink cluster has completely shut down. */
@VisibleForTesting
void waitForClusterShutdown(String namespace, String clusterId, long shutdownTimeout) {
-
boolean jobManagerRunning = true;
boolean serviceRunning = true;
@@ -765,4 +792,14 @@ public abstract class AbstractFlinkService implements FlinkService {
}
return effectiveStatus;
}
+
+ private void validateHaMetadataExists(Configuration conf) {
+ if (!isHaMetadataAvailable(conf)) {
+ throw new DeploymentFailedException(
+ "HA metadata not available to restore from last state. "
+ + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
+ + "Manual restore required.",
+ "RestoreFailed");
+ }
+ }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
index 761f1e68..9fce6eec 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkServiceFactory.java
@@ -22,6 +22,8 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +35,8 @@ public class FlinkServiceFactory {
private final FlinkConfigManager configManager;
private final Map<KubernetesDeploymentMode, FlinkService> serviceMap;
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkServiceFactory.class);
+
public FlinkServiceFactory(
KubernetesClient kubernetesClient, FlinkConfigManager configManager) {
this.kubernetesClient = kubernetesClient;
@@ -46,8 +50,10 @@ public class FlinkServiceFactory {
mode -> {
switch (mode) {
case NATIVE:
+ LOG.info("Using NativeFlinkService");
return new NativeFlinkService(kubernetesClient, configManager);
case STANDALONE:
+ LOG.info("Using StandaloneFlinkService");
return new StandaloneFlinkService(kubernetesClient, configManager);
default:
throw new UnsupportedOperationException(
@@ -57,6 +63,7 @@ public class FlinkServiceFactory {
}
public FlinkService getOrCreate(FlinkDeployment deployment) {
+ LOG.info("Getting service for {}", deployment.getMetadata().getName());
return getOrCreate(getDeploymentMode(deployment));
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 60b5ee3b..47d3fc01 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -26,13 +26,12 @@ import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
-import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import io.fabric8.kubernetes.api.model.ObjectMeta;
@@ -56,21 +55,8 @@ public class NativeFlinkService extends AbstractFlinkService {
}
@Override
- public void submitApplicationCluster(
- JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
- LOG.info(
- "Deploying application cluster{}",
- requireHaMetadata ? " requiring last-state from HA metadata" : "");
- if (FlinkUtils.isKubernetesHAActivated(conf)) {
- final String clusterId = conf.get(KubernetesConfigOptions.CLUSTER_ID);
- final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
- // Delete the job graph in the HA ConfigMaps so that the newly changed job config(e.g.
- // parallelism) could take effect
- FlinkUtils.deleteJobGraphInKubernetesHA(clusterId, namespace, kubernetesClient);
- }
- if (requireHaMetadata) {
- validateHaMetadataExists(conf);
- }
+ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
+ LOG.info("Deploying application cluster");
final ClusterClientServiceLoader clusterClientServiceLoader =
new DefaultClusterClientServiceLoader();
final ApplicationDeployer deployer =
@@ -85,16 +71,6 @@ public class NativeFlinkService extends AbstractFlinkService {
LOG.info("Application cluster successfully deployed");
}
- private void validateHaMetadataExists(Configuration conf) {
- if (!isHaMetadataAvailable(conf)) {
- throw new DeploymentFailedException(
- "HA metadata not available to restore from last state. "
- + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
- + "Manual restore required.",
- "RestoreFailed");
- }
- }
-
@Override
public void submitSessionCluster(Configuration conf) throws Exception {
LOG.info("Deploying session cluster");
@@ -110,6 +86,13 @@ public class NativeFlinkService extends AbstractFlinkService {
LOG.info("Session cluster successfully deployed");
}
+ @Override
+ public void cancelJob(
+ FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
+ throws Exception {
+ cancelJob(deployment, upgradeMode, configuration, false);
+ }
+
@Override
public void deleteClusterDeployment(
ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 5a0b85eb..08c2ef4e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -20,11 +20,15 @@ package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.config.Mode;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
@@ -58,22 +62,25 @@ public class StandaloneFlinkService extends AbstractFlinkService {
}
@Override
- public void submitApplicationCluster(
- JobSpec jobSpec, Configuration conf, boolean requireHaMetadata) throws Exception {
+ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
LOG.info("Deploying application cluster");
- // TODO some HA stuff?
- submitClusterInternal(conf);
+ submitClusterInternal(conf, Mode.APPLICATION);
LOG.info("Application cluster successfully deployed");
}
@Override
public void submitSessionCluster(Configuration conf) throws Exception {
LOG.info("Deploying session cluster");
- // TODO some HA stuff?
- submitClusterInternal(conf);
+ submitClusterInternal(conf, Mode.SESSION);
LOG.info("Session cluster successfully deployed");
}
+ @Override
+ public void cancelJob(FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration conf)
+ throws Exception {
+ cancelJob(deployment, upgradeMode, conf, true);
+ }
+
@Override
public void deleteClusterDeployment(
ObjectMeta meta, FlinkDeploymentStatus status, boolean deleteHaData) {
@@ -106,13 +113,26 @@ public class StandaloneFlinkService extends AbstractFlinkService {
executorService);
}
- private void submitClusterInternal(Configuration conf) throws ClusterDeploymentException {
+ private void submitClusterInternal(Configuration conf, Mode mode)
+ throws ClusterDeploymentException {
final String namespace = conf.get(KubernetesConfigOptions.NAMESPACE);
FlinkStandaloneKubeClient client = createNamespacedKubeClient(conf, namespace);
try (final KubernetesStandaloneClusterDescriptor kubernetesClusterDescriptor =
new KubernetesStandaloneClusterDescriptor(conf, client)) {
- kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+ switch (mode) {
+ case APPLICATION:
+ kubernetesClusterDescriptor.deployApplicationCluster(
+ getClusterSpecification(conf),
+ ApplicationConfiguration.fromConfiguration(conf));
+ break;
+ case SESSION:
+ kubernetesClusterDescriptor.deploySessionCluster(getClusterSpecification(conf));
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported running mode: %s", mode));
+ }
}
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
index 62bea6dd..db238f28 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java
@@ -45,11 +45,7 @@ public class EventSourceUtils {
public static InformerEventSource<Deployment, FlinkDeployment> getDeploymentInformerEventSource(
EventSourceContext<FlinkDeployment> context) {
final String labelSelector =
- Map.of(
- Constants.LABEL_TYPE_KEY,
- Constants.LABEL_TYPE_NATIVE_TYPE,
- Constants.LABEL_COMPONENT_KEY,
- Constants.LABEL_COMPONENT_JOB_MANAGER)
+ Map.of(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER)
.entrySet().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 05cb7e03..d2f4c359 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -167,6 +167,10 @@ public class FlinkUtils {
public static int getNumTaskManagers(Configuration conf) {
int parallelism = conf.get(CoreOptions.DEFAULT_PARALLELISM);
+ return getNumTaskManagers(conf, parallelism);
+ }
+
+ public static int getNumTaskManagers(Configuration conf, int parallelism) {
int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
return (parallelism + taskSlots - 1) / taskSlots;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index de3c7b68..9e92dc93 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
@@ -88,7 +89,11 @@ public class DefaultValidator implements FlinkResourceValidator {
deployment.getMetadata().getName(),
deployment.getMetadata().getNamespace()),
validateLogConfig(spec.getLogConfiguration()),
- validateJobSpec(spec.getJob(), spec.getTaskManager(), effectiveConfig),
+ validateJobSpec(
+ spec.getJob(),
+ spec.getTaskManager(),
+ effectiveConfig,
+ KubernetesDeploymentMode.getDeploymentMode(deployment)),
validateJmSpec(spec.getJobManager(), effectiveConfig),
validateTmSpec(spec.getTaskManager()),
validateSpecChange(deployment, effectiveConfig),
@@ -173,7 +178,10 @@ public class DefaultValidator implements FlinkResourceValidator {
}
private Optional<String> validateJobSpec(
- JobSpec job, @Nullable TaskManagerSpec tm, Map<String, String> confMap) {
+ JobSpec job,
+ @Nullable TaskManagerSpec tm,
+ Map<String, String> confMap,
+ KubernetesDeploymentMode mode) {
if (job == null) {
return Optional.empty();
}
@@ -306,6 +314,24 @@ public class DefaultValidator implements FlinkResourceValidator {
return Optional.of("Cannot switch from job to session cluster");
}
+ KubernetesDeploymentMode oldDeploymentMode =
+ oldSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : oldSpec.getMode();
+
+ KubernetesDeploymentMode newDeploymentMode =
+ newSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : newSpec.getMode();
+
+ if (oldDeploymentMode == KubernetesDeploymentMode.NATIVE
+ && newDeploymentMode != KubernetesDeploymentMode.NATIVE) {
+ return Optional.of(
+ "Cannot switch from native kubernetes to standalone kubernetes cluster");
+ }
+
+ if (oldDeploymentMode == KubernetesDeploymentMode.STANDALONE
+ && newDeploymentMode != KubernetesDeploymentMode.STANDALONE) {
+ return Optional.of(
+ "Cannot switch from standalone kubernetes to native kubernetes cluster");
+ }
+
JobSpec oldJob = oldSpec.getJob();
JobSpec newJob = newSpec.getJob();
if (oldJob != null && newJob != null) {
@@ -358,7 +384,11 @@ public class DefaultValidator implements FlinkResourceValidator {
return firstPresent(
validateNotApplicationCluster(sessionCluster),
validateSessionClusterId(sessionJob, sessionCluster),
- validateJobSpec(sessionJob.getSpec().getJob(), null, effectiveConfig));
+ validateJobSpec(
+ sessionJob.getSpec().getJob(),
+ null,
+ effectiveConfig,
+ KubernetesDeploymentMode.getDeploymentMode(sessionCluster)));
}
private Optional<String> validateJobNotEmpty(FlinkSessionJob sessionJob) {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 2e99c53f..4ff0a33f 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
@@ -156,6 +157,10 @@ public class TestingFlinkService extends AbstractFlinkService {
if (requireHaMetadata) {
validateHaMetadataExists(conf);
}
+ deployApplicationCluster(jobSpec, conf);
+ }
+
+ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
if (deployFailure) {
throw new Exception("Deployment failure");
}
@@ -358,6 +363,13 @@ public class TestingFlinkService extends AbstractFlinkService {
0);
}
+ @Override
+ public void cancelJob(
+ FlinkDeployment deployment, UpgradeMode upgradeMode, Configuration configuration)
+ throws Exception {
+ cancelJob(deployment, upgradeMode, configuration, false);
+ }
+
private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean savepoint)
throws Exception {
Optional<Tuple2<String, JobStatusMessage>> jobOpt =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index df5bf415..f33a9986 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -17,6 +17,8 @@
package org.apache.flink.kubernetes.operator.config;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
@@ -33,9 +35,11 @@ import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
@@ -54,6 +58,7 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
@@ -62,6 +67,8 @@ import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
/** FlinkConfigBuilderTest. */
public class FlinkConfigBuilderTest {
@@ -347,6 +354,84 @@ public class FlinkConfigBuilderTest {
configuration.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
}
+ @Test
+ public void testApplyStandaloneApplicationSpec() throws URISyntaxException, IOException {
+ FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+ final String entryClass = "entry.class";
+ final String jarUri = "local:///flink/opt/StateMachine.jar";
+ final String correctedJarUri = "file:///flink/opt/StateMachine.jar";
+ dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ dep.getSpec().getJob().setEntryClass(entryClass);
+ dep.getSpec().getJob().setJarURI(jarUri);
+ dep.getSpec().setTaskManager(new TaskManagerSpec());
+ dep.getSpec().getTaskManager().setReplicas(3);
+ dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+
+ Configuration configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+
+ Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET));
+ Assertions.assertEquals(
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION,
+ configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
+ Assertions.assertEquals(6, configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM));
+ Assertions.assertEquals(
+ entryClass,
+ configuration.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS));
+ Assertions.assertEquals(
+ 3,
+ configuration.get(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+ List<String> classpaths =
+ ConfigUtils.decodeListFromConfig(
+ configuration, PipelineOptions.CLASSPATHS, String::toString);
+ assertThat(classpaths, containsInAnyOrder(correctedJarUri));
+
+ dep.getSpec().getTaskManager().setReplicas(null);
+ dep.getSpec().getJob().setParallelism(10);
+
+ configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+ Assertions.assertEquals(
+ 5,
+ configuration.get(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+ }
+
+ @Test
+ public void testApplyStandaloneSessionSpec() throws URISyntaxException, IOException {
+ FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+ dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ dep.getSpec().setJob(null);
+ dep.getSpec().setTaskManager(new TaskManagerSpec());
+ dep.getSpec().getTaskManager().setReplicas(5);
+ dep.getSpec().getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+
+ Configuration configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+
+ Assertions.assertEquals("remote", configuration.getString(DeploymentOptions.TARGET));
+ Assertions.assertEquals(
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION,
+ configuration.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
+ Assertions.assertEquals(
+ 5,
+ configuration.get(
+ StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
+ }
+
@Test
public void testBuildFrom() throws Exception {
final Configuration configuration =
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index c169e1d9..088d46bf 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -70,6 +70,7 @@ public class TestingFlinkDeploymentController
KubernetesClient kubernetesClient,
TestingFlinkService flinkService) {
FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+
eventRecorder = new EventRecorder(kubernetesClient, eventCollector);
statusRecorder =
new StatusRecorder<>(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
index f801bfbc..5dddb7b9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserverTest.java
@@ -54,7 +54,6 @@ public class SessionJobObserverTest {
private TestingFlinkService flinkService;
private SessionJobObserver observer;
private SessionJobReconciler reconciler;
- private FlinkServiceFactory flinkServiceFactory;
@BeforeEach
public void before() {
@@ -62,7 +61,7 @@ public class SessionJobObserverTest {
var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
var statusRecorder = new TestingStatusRecorder<FlinkSessionJobStatus>();
flinkService = new TestingFlinkService();
- flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+ FlinkServiceFactory flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
observer =
new SessionJobObserver(
flinkServiceFactory, configManager, statusRecorder, eventRecorder);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 998fa34d..d6f342c9 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -72,7 +72,7 @@ public class SessionJobReconcilerTest {
@BeforeEach
public void before() {
flinkService = new TestingFlinkService();
- flinkServiceFactory = flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
+ flinkServiceFactory = new TestingFlinkServiceFactory(flinkService);
eventRecorder =
new EventRecorder(null, (r, e) -> {}) {
@Override
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5189ab47..23e75bc1 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentReconciliationStatus;
@@ -301,6 +302,74 @@ public class DefaultValidatorTest {
},
"Cannot switch from session to job cluster");
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus()
+ .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+ dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+ spec.setMode(KubernetesDeploymentMode.NATIVE);
+ dep.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(spec, dep);
+ },
+ "Cannot switch from native kubernetes to standalone kubernetes cluster");
+
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus()
+ .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+ dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+ spec.setMode(null);
+ dep.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(spec, dep);
+ },
+ "Cannot switch from native kubernetes to standalone kubernetes cluster");
+
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus()
+ .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+ dep.getSpec().setMode(null);
+ FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+ spec.setMode(KubernetesDeploymentMode.STANDALONE);
+ dep.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(spec, dep);
+ },
+ "Cannot switch from standalone kubernetes to native kubernetes cluster");
+
+ testError(
+ dep -> {
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus()
+ .setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
+ dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
+ FlinkDeploymentSpec spec = ReconciliationUtils.clone(dep.getSpec());
+
+ spec.setMode(KubernetesDeploymentMode.STANDALONE);
+ dep.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(spec, dep);
+ },
+ "Cannot switch from standalone kubernetes to native kubernetes cluster");
+
// Test upgrade mode change validation
testError(
dep -> {