You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/07/20 15:59:09 UTC
[flink-kubernetes-operator] 03/05: [FLINK-27444] Add KubernetesStandaloneClusterDescriptor and FlinkStandaloneKubeClient
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 261fed2076efe385ede148152c946eb7c5f1f48d
Author: Usamah Jassat <us...@amazon.com>
AuthorDate: Mon Jun 13 15:00:12 2022 +0100
[FLINK-27444] Add KubernetesStandaloneClusterDescriptor and FlinkStandaloneKubeClient
---
flink-kubernetes-standalone/pom.xml | 1 -
.../Fabric8FlinkStandaloneKubeClient.java | 71 ++++++
.../kubeclient/FlinkStandaloneKubeClient.java | 27 ++
.../CmdStandaloneJobManagerDecorator.java | 6 +
.../decorators/UserLibMountDecorator.java | 13 +
.../StandaloneKubernetesJobManagerFactory.java | 124 ++++++++++
.../StandaloneKubernetesTaskManagerFactory.java | 90 +++++++
.../StandaloneKubernetesJobManagerParameters.java | 8 +
.../KubernetesStandaloneClusterDescriptor.java | 257 +++++++++++++++++++
.../src/main/resources/META-INF/NOTICE | 5 +
.../Fabric8FlinkStandaloneKubeClientTest.java | 104 ++++++++
.../CmdStandaloneJobManagerDecoratorTest.java | 26 +-
.../decorators/UserLibMountDecoratorTest.java | 50 +++-
.../StandaloneKubernetesJobManagerFactoryTest.java | 273 +++++++++++++++++++++
...StandaloneKubernetesTaskManagerFactoryTest.java | 150 +++++++++++
.../KubernetesStandaloneClusterDescriptorTest.java | 182 ++++++++++++++
16 files changed, 1383 insertions(+), 4 deletions(-)
diff --git a/flink-kubernetes-standalone/pom.xml b/flink-kubernetes-standalone/pom.xml
index f2a1c2f6..6125efe3 100644
--- a/flink-kubernetes-standalone/pom.xml
+++ b/flink-kubernetes-standalone/pom.xml
@@ -29,7 +29,6 @@ under the License.
</parent>
-
<artifactId>flink-kubernetes-standalone</artifactId>
<name>Flink Kubernetes Standalone</name>
<packaging>jar</packaging>
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
new file mode 100644
index 00000000..45454dd7
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The Implementation of {@link FlinkStandaloneKubeClient}. */
+public class Fabric8FlinkStandaloneKubeClient extends Fabric8FlinkKubeClient
+ implements FlinkStandaloneKubeClient {
+
+ private final NamespacedKubernetesClient internalClient;
+
+ public Fabric8FlinkStandaloneKubeClient(
+ Configuration flinkConfig,
+ NamespacedKubernetesClient client,
+ ExecutorService executorService) {
+ super(flinkConfig, client, executorService);
+ internalClient = checkNotNull(client);
+ }
+
+ @Override
+ public void createTaskManagerDeployment(Deployment tmDeployment) {
+ this.internalClient.apps().deployments().create(tmDeployment);
+ }
+
+ @Override
+ public void stopAndCleanupCluster(String clusterId) {
+ this.internalClient
+ .apps()
+ .deployments()
+ .withName(StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId))
+ .cascading(true)
+ .delete();
+
+ this.internalClient
+ .apps()
+ .deployments()
+ .withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId))
+ .cascading(true)
+ .delete();
+ }
+
+ public static NamespacedKubernetesClient createNamespacedKubeClient(String namespace) {
+ return new DefaultKubernetesClient().inNamespace(namespace);
+ }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
new file mode 100644
index 00000000..9a7f8207
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/FlinkStandaloneKubeClient.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+
+/** Extension of the FlinkKubeClient that is used for Flink standalone deployments. */
+public interface FlinkStandaloneKubeClient extends FlinkKubeClient {
+ void createTaskManagerDeployment(Deployment tmDeployment);
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
index 430abe2e..9bfcc866 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecorator.java
@@ -79,6 +79,12 @@ public class CmdStandaloneJobManagerDecorator extends AbstractKubernetesStepDeco
args.add(mainClass);
}
+ Boolean allowNonRestoredState = kubernetesJobManagerParameters.getAllowNonRestoredState();
+ if (allowNonRestoredState != null) {
+ args.add("--allowNonRestoredState");
+ args.add(allowNonRestoredState.toString());
+ }
+
return args;
}
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
index a66026b2..207637ac 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecorator.java
@@ -27,6 +27,9 @@ import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+
+import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -51,6 +54,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
return flinkPod;
}
+ if (mainContainerHasUserLibPath(flinkPod)) {
+ return flinkPod;
+ }
+
final Volume userLibVolume =
new VolumeBuilder()
.withName(USER_LIB_VOLUME)
@@ -78,4 +85,10 @@ public class UserLibMountDecorator extends AbstractKubernetesStepDecorator {
.withMainContainer(mountedMainContainer)
.build();
}
+
+ private boolean mainContainerHasUserLibPath(FlinkPod flinkPod) {
+ List<VolumeMount> volumeMounts = flinkPod.getMainContainer().getVolumeMounts();
+ return volumeMounts.stream()
+ .anyMatch(volumeMount -> volumeMount.getMountPath().startsWith(USER_LIB_PATH));
+ }
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
new file mode 100644
index 00000000..f789db93
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InitJobManagerDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesOwnerReference;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneJobManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.UserLibMountDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for constructing all the Kubernetes for the JobManager deploying in standalone
+ * mode. This can include the Deployment, the ConfigMap(s), and the Service(s).
+ */
+public class StandaloneKubernetesJobManagerFactory {
+ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification(
+ FlinkPod podTemplate,
+ StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters)
+ throws IOException {
+ FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+ List<HasMetadata> accompanyingResources = new ArrayList<>();
+
+ final KubernetesStepDecorator[] stepDecorators =
+ new KubernetesStepDecorator[] {
+ new InitJobManagerDecorator(kubernetesJobManagerParameters),
+ new EnvSecretsDecorator(kubernetesJobManagerParameters),
+ new MountSecretsDecorator(kubernetesJobManagerParameters),
+ new CmdStandaloneJobManagerDecorator(kubernetesJobManagerParameters),
+ new InternalServiceDecorator(kubernetesJobManagerParameters),
+ new ExternalServiceDecorator(kubernetesJobManagerParameters),
+ new HadoopConfMountDecorator(kubernetesJobManagerParameters),
+ new KerberosMountDecorator(kubernetesJobManagerParameters),
+ new FlinkConfMountDecorator(kubernetesJobManagerParameters),
+ new UserLibMountDecorator(kubernetesJobManagerParameters),
+ };
+
+ for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+ flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
+ accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
+ }
+
+ final Deployment deployment =
+ createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);
+
+ return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
+ }
+
+ private static Deployment createJobManagerDeployment(
+ FlinkPod flinkPod, KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ final Container resolvedMainContainer = flinkPod.getMainContainer();
+
+ final Pod resolvedPod =
+ new PodBuilder(flinkPod.getPodWithoutMainContainer())
+ .editOrNewSpec()
+ .addToContainers(resolvedMainContainer)
+ .endSpec()
+ .build();
+ return new DeploymentBuilder()
+ .withApiVersion(Constants.APPS_API_VERSION)
+ .editOrNewMetadata()
+ .withName(
+ StandaloneKubernetesUtils.getJobManagerDeploymentName(
+ kubernetesJobManagerParameters.getClusterId()))
+ .withAnnotations(kubernetesJobManagerParameters.getAnnotations())
+ .withLabels(kubernetesJobManagerParameters.getLabels())
+ .withOwnerReferences(
+ kubernetesJobManagerParameters.getOwnerReference().stream()
+ .map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
+ .collect(Collectors.toList()))
+ .endMetadata()
+ .editOrNewSpec()
+ .withReplicas(kubernetesJobManagerParameters.getReplicas())
+ .editOrNewTemplate()
+ .withMetadata(resolvedPod.getMetadata())
+ .withSpec(resolvedPod.getSpec())
+ .endTemplate()
+ .editOrNewSelector()
+ .addToMatchLabels(kubernetesJobManagerParameters.getSelectors())
+ .endSelector()
+ .endSpec()
+ .build();
+ }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
new file mode 100644
index 00000000..8bcd9990
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.decorators.EnvSecretsDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.CmdStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.decorators.InitStandaloneTaskManagerDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.Preconditions;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+
+/** Utility class for constructing the TaskManager Deployment when deploying in standalone mode. */
+public class StandaloneKubernetesTaskManagerFactory {
+
+ public static Deployment buildKubernetesTaskManagerDeployment(
+ FlinkPod podTemplate,
+ StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
+ FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
+
+ final KubernetesStepDecorator[] stepDecorators =
+ new KubernetesStepDecorator[] {
+ new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+ new EnvSecretsDecorator(kubernetesTaskManagerParameters),
+ new MountSecretsDecorator(kubernetesTaskManagerParameters),
+ new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
+ new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
+ new KerberosMountDecorator(kubernetesTaskManagerParameters),
+ new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
+ };
+
+ for (KubernetesStepDecorator stepDecorator : stepDecorators) {
+ flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
+ }
+
+ final Pod resolvedPod =
+ new PodBuilder(flinkPod.getPodWithoutMainContainer())
+ .editOrNewSpec()
+ .addToContainers(flinkPod.getMainContainer())
+ .endSpec()
+ .build();
+
+ return new DeploymentBuilder()
+ .withApiVersion(Constants.APPS_API_VERSION)
+ .editOrNewMetadata()
+ .withName(
+ StandaloneKubernetesUtils.getTaskManagerDeploymentName(
+ kubernetesTaskManagerParameters.getClusterId()))
+ .withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
+ .withLabels(kubernetesTaskManagerParameters.getLabels())
+ .endMetadata()
+ .editOrNewSpec()
+ .withReplicas(kubernetesTaskManagerParameters.getReplicas())
+ .editOrNewTemplate()
+ .withMetadata(resolvedPod.getMetadata())
+ .withSpec(resolvedPod.getSpec())
+ .endTemplate()
+ .editOrNewSelector()
+ .addToMatchLabels(kubernetesTaskManagerParameters.getSelectors())
+ .endSelector()
+ .endSpec()
+ .build();
+ }
+}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
index bdc28d48..fbfcbdeb 100644
--- a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/parameters/StandaloneKubernetesJobManagerParameters.java
@@ -24,6 +24,7 @@ import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import java.util.Collections;
import java.util.HashMap;
@@ -80,4 +81,11 @@ public class StandaloneKubernetesJobManagerParameters extends KubernetesJobManag
}
return flinkConfig.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS);
}
+
+ public Boolean getAllowNonRestoredState() {
+ if (flinkConfig.contains(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)) {
+ return flinkConfig.get(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE);
+ }
+ return null;
+ }
}
diff --git a/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
new file mode 100644
index 00000000..cfed561a
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.client.deployment.ClusterDeploymentException;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.Endpoint;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.rpc.AddressResolution;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Standalone Kubernetes specific {@link ClusterDescriptor} implementation. */
+public class KubernetesStandaloneClusterDescriptor extends KubernetesClusterDescriptor {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(KubernetesStandaloneClusterDescriptor.class);
+
+ private static final String CLUSTER_DESCRIPTION = "Standalone Kubernetes cluster";
+
+ private final Configuration flinkConfig;
+
+ private final FlinkStandaloneKubeClient client;
+
+ private final String clusterId;
+
+ public KubernetesStandaloneClusterDescriptor(
+ Configuration flinkConfig, FlinkStandaloneKubeClient client) {
+ super(flinkConfig, client);
+ this.flinkConfig = checkNotNull(flinkConfig);
+ this.client = checkNotNull(client);
+ this.clusterId =
+ checkNotNull(
+ flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID),
+ "ClusterId must be specified!");
+ }
+
+ @Override
+ public String getClusterDescription() {
+ return CLUSTER_DESCRIPTION;
+ }
+
+ @Override
+ public ClusterClientProvider<String> deploySessionCluster(
+ ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+ flinkConfig.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
+
+ final ClusterClientProvider clusterClientProvider =
+ deployClusterInternal(clusterSpecification);
+
+ try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+ LOG.info(
+ "Created flink session cluster {} successfully, JobManager Web Interface: {}",
+ clusterId,
+ clusterClient.getWebInterfaceURL());
+ }
+ return clusterClientProvider;
+ }
+
+ @Override
+ public ClusterClientProvider<String> deployApplicationCluster(
+ ClusterSpecification clusterSpecification,
+ ApplicationConfiguration applicationConfiguration)
+ throws ClusterDeploymentException {
+ flinkConfig.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+ applicationConfiguration.applyToConfiguration(flinkConfig);
+ final ClusterClientProvider clusterClientProvider =
+ deployClusterInternal(clusterSpecification);
+
+ try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
+ LOG.info(
+ "Created flink application cluster {} successfully, JobManager Web Interface: {}",
+ clusterId,
+ clusterClient.getWebInterfaceURL());
+ }
+ return clusterClientProvider;
+ }
+
+ private ClusterClientProvider<String> deployClusterInternal(
+ ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+
+ // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
+ KubernetesUtils.checkAndUpdatePortConfigOption(
+ flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
+ KubernetesUtils.checkAndUpdatePortConfigOption(
+ flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
+ KubernetesUtils.checkAndUpdatePortConfigOption(
+ flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
+
+ if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
+ flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
+ KubernetesUtils.checkAndUpdatePortConfigOption(
+ flinkConfig,
+ HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
+ flinkConfig.get(JobManagerOptions.PORT));
+ }
+
+ // Deploy JM + resources
+ try {
+ KubernetesJobManagerSpecification jmSpec = getJobManagerSpec(clusterSpecification);
+ Deployment tmDeployment = getTaskManagerDeployment(clusterSpecification);
+
+ client.createJobManagerComponent(jmSpec);
+ client.createTaskManagerDeployment(tmDeployment);
+
+ return createClusterClientProvider(clusterId);
+ } catch (Exception e) {
+ try {
+ LOG.warn(
+ "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.",
+ clusterId);
+ client.stopAndCleanupCluster(clusterId);
+ } catch (Exception e1) {
+ LOG.info(
+ "Failed to stop and clean up the Kubernetes cluster \"{}\".",
+ clusterId,
+ e1);
+ }
+ throw new ClusterDeploymentException(
+ "Could not create Kubernetes cluster \"" + clusterId + "\".", e);
+ }
+ }
+
+ private KubernetesJobManagerSpecification getJobManagerSpec(
+ ClusterSpecification clusterSpecification) throws IOException {
+ final StandaloneKubernetesJobManagerParameters kubernetesJobManagerParameters =
+ new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification);
+
+ final FlinkPod podTemplate =
+ kubernetesJobManagerParameters
+ .getPodTemplateFilePath()
+ .map(
+ file ->
+ KubernetesUtils.loadPodFromTemplateFile(
+ client, file, Constants.MAIN_CONTAINER_NAME))
+ .orElse(new FlinkPod.Builder().build());
+
+ return StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+ podTemplate, kubernetesJobManagerParameters);
+ }
+
+ private Deployment getTaskManagerDeployment(ClusterSpecification clusterSpecification) {
+ final StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters =
+ new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification);
+
+ final FlinkPod podTemplate =
+ kubernetesTaskManagerParameters
+ .getPodTemplateFilePath()
+ .map(
+ file ->
+ KubernetesUtils.loadPodFromTemplateFile(
+ client, file, Constants.MAIN_CONTAINER_NAME))
+ .orElse(new FlinkPod.Builder().build());
+
+ return StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+ podTemplate, kubernetesTaskManagerParameters);
+ }
+
+ private ClusterClientProvider<String> createClusterClientProvider(String clusterId) {
+ return () -> {
+ final Configuration configuration = new Configuration(flinkConfig);
+
+ final Optional<Endpoint> restEndpoint = client.getRestEndpoint(clusterId);
+
+ if (restEndpoint.isPresent()) {
+ configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress());
+ configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort());
+ } else {
+ throw new RuntimeException(
+ new ClusterRetrieveException(
+ "Could not get the rest endpoint of " + clusterId));
+ }
+
+ try {
+ // Flink client will always use Kubernetes service to contact with jobmanager. So we
+ // have a pre-configured web monitor address. Using StandaloneClientHAServices to
+ // create RestClusterClient is reasonable.
+ return new RestClusterClient<>(
+ configuration,
+ clusterId,
+ (effectiveConfiguration, fatalErrorHandler) ->
+ new StandaloneClientHAServices(
+ getWebMonitorAddress(effectiveConfiguration)));
+ } catch (Exception e) {
+ throw new RuntimeException(
+ new ClusterRetrieveException("Could not create the RestClusterClient.", e));
+ }
+ };
+ }
+
+ private String getWebMonitorAddress(Configuration configuration) throws Exception {
+ AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
+ final KubernetesConfigOptions.ServiceExposedType serviceType =
+ configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
+ if (serviceType.isClusterIP()) {
+ resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
+ LOG.warn(
+ "Please note that Flink client operations(e.g. cancel, list, stop,"
+ + " savepoint, etc.) won't work from outside the Kubernetes cluster"
+ + " since '{}' has been set to {}.",
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
+ serviceType);
+ }
+ return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
+ }
+}
diff --git a/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000..993ca343
--- /dev/null
+++ b/flink-kubernetes-standalone/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,5 @@
+flink-kubernetes-operator-standalone
+Copyright 2014-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
new file mode 100644
index 00000000..9c52eb32
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClientTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesJobManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.factory.StandaloneKubernetesTaskManagerFactory;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link Fabric8FlinkStandaloneKubeClient unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class Fabric8FlinkStandaloneKubeClientTest {
+ private static final String NAMESPACE = "test";
+
+ KubernetesMockServer mockServer;
+ protected NamespacedKubernetesClient kubernetesClient;
+ private FlinkStandaloneKubeClient flinkKubeClient;
+ private StandaloneKubernetesTaskManagerParameters taskManagerParameters;
+ private Deployment tmDeployment;
+ private ClusterSpecification clusterSpecification;
+ private Configuration flinkConfig = new Configuration();
+
+ @BeforeEach
+ public final void setup() {
+ flinkConfig = TestUtils.createTestFlinkConfig();
+ kubernetesClient = mockServer.createClient();
+
+ flinkKubeClient =
+ new Fabric8FlinkStandaloneKubeClient(
+ flinkConfig, kubernetesClient, Executors.newDirectExecutorService());
+ clusterSpecification = TestUtils.createClusterSpecification();
+
+ taskManagerParameters =
+ new StandaloneKubernetesTaskManagerParameters(flinkConfig, clusterSpecification);
+
+ tmDeployment =
+ StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+ new FlinkPod.Builder().build(), taskManagerParameters);
+ }
+
+ @Test
+ public void testCreateTaskManagerDeployment() {
+ flinkKubeClient.createTaskManagerDeployment(tmDeployment);
+
+ final List<Deployment> resultedDeployments =
+ kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+ assertEquals(1, resultedDeployments.size());
+ }
+
+ @Test
+ public void testStopAndCleanupCluster() throws Exception {
+ ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+ StandaloneKubernetesJobManagerParameters jmParameters =
+ new StandaloneKubernetesJobManagerParameters(flinkConfig, clusterSpecification);
+ KubernetesJobManagerSpecification jmSpec =
+ StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+ new FlinkPod.Builder().build(), jmParameters);
+
+ flinkKubeClient.createJobManagerComponent(jmSpec);
+ flinkKubeClient.createTaskManagerDeployment(tmDeployment);
+
+ List<Deployment> resultedDeployments =
+ kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+ assertEquals(2, resultedDeployments.size());
+
+ flinkKubeClient.stopAndCleanupCluster(taskManagerParameters.getClusterId());
+
+ resultedDeployments =
+ kubernetesClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
+ assertEquals(0, resultedDeployments.size());
+ }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
index 49355574..8bdb0f33 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/CmdStandaloneJobManagerDecoratorTest.java
@@ -18,11 +18,13 @@
package org.apache.flink.kubernetes.operator.kubeclient.decorators;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -67,7 +69,29 @@ public class CmdStandaloneJobManagerDecoratorTest {
}
@Test
- public void testApplicationCommandAdded() {
+ public void testApplicationCommandAndArgsAdded() {
+ final String testMainClass = "org.main.class";
+ configuration.set(
+ StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+ configuration.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, testMainClass);
+ configuration.set(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false);
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(new FlinkPod.Builder().build());
+ assertThat(
+ decoratedPod.getMainContainer().getCommand(), containsInAnyOrder(MOCK_ENTRYPATH));
+ assertThat(
+ decoratedPod.getMainContainer().getArgs(),
+ containsInAnyOrder(
+ CmdStandaloneJobManagerDecorator.APPLICATION_MODE_ARG,
+ "--allowNonRestoredState",
+ "false",
+ "--job-classname",
+ testMainClass));
+ }
+
+ @Test
+ public void testApplicationOptionalArgsNotAdded() {
configuration.set(
StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE,
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
index 07c8af20..1a867521 100644
--- a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/decorators/UserLibMountDecoratorTest.java
@@ -23,6 +23,10 @@ import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.PodSpecBuilder;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
import org.junit.jupiter.api.Test;
@@ -32,7 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class UserLibMountDecoratorTest {
@Test
- public void testVolumeAdded() {
+ public void testVolumeAddedApplicationMode() {
StandaloneKubernetesJobManagerParameters jmParameters =
createJmParamsWithClusterMode(
StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
@@ -54,7 +58,7 @@ public class UserLibMountDecoratorTest {
}
@Test
- public void testVolumeNotAdded() {
+ public void testVolumeNotAddedSessionMode() {
StandaloneKubernetesJobManagerParameters jmParameters =
createJmParamsWithClusterMode(
StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION);
@@ -69,6 +73,48 @@ public class UserLibMountDecoratorTest {
assertEquals(0, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
}
+ @Test
+ public void testVolumeNotAddedExistingVolumeMount() {
+ StandaloneKubernetesJobManagerParameters jmParameters =
+ createJmParamsWithClusterMode(
+ StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION);
+ UserLibMountDecorator decorator = new UserLibMountDecorator(jmParameters);
+
+ final String volName = "flink-artifact";
+ final String userLibPath = "/opt/flink/usrlib";
+
+ FlinkPod baseFlinkPod =
+ new FlinkPod.Builder()
+ .withMainContainer(
+ new ContainerBuilder()
+ .addNewVolumeMount()
+ .withName(volName)
+ .withMountPath(userLibPath)
+ .endVolumeMount()
+ .build())
+ .withPod(
+ new PodBuilder()
+ .withSpec(
+ new PodSpecBuilder()
+ .addNewVolumeLike(
+ new VolumeBuilder()
+ .withName(volName)
+ .withNewEmptyDir()
+ .endEmptyDir()
+ .build())
+ .endVolume()
+ .build())
+ .build())
+ .build();
+
+ assertEquals(1, baseFlinkPod.getMainContainer().getVolumeMounts().size());
+ assertEquals(1, baseFlinkPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+
+ FlinkPod decoratedPod = decorator.decorateFlinkPod(baseFlinkPod);
+ assertEquals(1, decoratedPod.getMainContainer().getVolumeMounts().size());
+ assertEquals(1, decoratedPod.getPodWithoutMainContainer().getSpec().getVolumes().size());
+ }
+
private StandaloneKubernetesJobManagerParameters createJmParamsWithClusterMode(
StandaloneKubernetesConfigOptionsInternal.ClusterMode clusterMode) {
return new StandaloneKubernetesJobManagerParameters(
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
new file mode 100644
index 00000000..832ce97a
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactoryTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.kubernetes.configuration.KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE;
+import static org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils.CLUSTER_ID;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** @link StandaloneKubernetesJobManagerFactory unit tests */
+public class StandaloneKubernetesJobManagerFactoryTest extends ParametersTestBase {
+
+ KubernetesJobManagerSpecification jmSpec;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ setupFlinkConfig();
+ flinkConfig.set(KubernetesConfigOptions.FLINK_CONF_DIR, "/missing/dir");
+ FlinkPod podTemplate = createPodTemplate();
+ StandaloneKubernetesJobManagerParameters tmParameters =
+ new StandaloneKubernetesJobManagerParameters(
+ flinkConfig, TestUtils.createClusterSpecification());
+ jmSpec =
+ StandaloneKubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
+ podTemplate, tmParameters);
+ }
+
+ @Test
+ public void testDeploymentMetadata() {
+ ObjectMeta deploymentMetadata = jmSpec.getDeployment().getMetadata();
+ assertEquals(
+ StandaloneKubernetesUtils.getJobManagerDeploymentName(CLUSTER_ID),
+ deploymentMetadata.getName());
+
+ final Map<String, String> expectedLabels =
+ new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID));
+ expectedLabels.putAll(userLabels);
+ assertEquals(expectedLabels, deploymentMetadata.getLabels());
+
+ assertEquals(userAnnotations, deploymentMetadata.getAnnotations());
+ }
+
+ @Test
+ public void testDeploymentSpec() {
+ DeploymentSpec deploymentSpec = jmSpec.getDeployment().getSpec();
+
+ assertEquals(1, deploymentSpec.getReplicas());
+ assertEquals(
+ StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID),
+ deploymentSpec.getSelector().getMatchLabels());
+ }
+
+ @Test
+ public void testTemplateMetadata() {
+ final ObjectMeta podMetadata = jmSpec.getDeployment().getSpec().getTemplate().getMetadata();
+
+ final Map<String, String> expectedLabels =
+ new HashMap<>(StandaloneKubernetesUtils.getJobManagerSelectors(CLUSTER_ID));
+ expectedLabels.putAll(userLabels);
+ expectedLabels.putAll(templateLabels);
+ assertEquals(expectedLabels, podMetadata.getLabels());
+
+ final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations);
+ expectedAnnotations.putAll(templateAnnotations);
+ assertEquals(expectedAnnotations, podMetadata.getAnnotations());
+ }
+
+ @Test
+ public void testTemplateSpec() {
+ final PodSpec podSpec = jmSpec.getDeployment().getSpec().getTemplate().getSpec();
+
+ assertEquals(1, podSpec.getContainers().size());
+ assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName());
+ // Config and secret volumes
+ assertEquals(3, podSpec.getVolumes().size());
+
+ final Container mainContainer = podSpec.getContainers().get(0);
+ assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName());
+ assertEquals(TestUtils.IMAGE, mainContainer.getImage());
+ assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy());
+
+ final Map<String, String> envs = new HashMap<>();
+ mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue()));
+
+ Map<String, String> expectedEnvs = new HashMap<>(templateEnvs);
+ expectedEnvs.put(Constants.ENV_FLINK_POD_IP_ADDRESS, null);
+ assertEquals(expectedEnvs, envs);
+
+ final List<ContainerPort> expectedContainerPorts =
+ Arrays.asList(
+ new ContainerPortBuilder()
+ .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
+ .withContainerPort(6123)
+ .build(),
+ new ContainerPortBuilder()
+ .withName(Constants.BLOB_SERVER_PORT_NAME)
+ .withContainerPort(Constants.BLOB_SERVER_PORT)
+ .build(),
+ new ContainerPortBuilder()
+ .withName(Constants.REST_PORT_NAME)
+ .withContainerPort(Constants.REST_PORT)
+ .build(),
+ new ContainerPortBuilder()
+ .withName(TEMPLATE_PORT_NAME)
+ .withContainerPort(TEMPLATE_PORT)
+ .build());
+
+ assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray()));
+
+ final ResourceRequirements resourceRequirements = mainContainer.getResources();
+
+ final Map<String, Quantity> requests = resourceRequirements.getRequests();
+ assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), requests.get("cpu").getAmount());
+ assertEquals(
+ String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB),
+ requests.get("memory").getAmount());
+
+ final Map<String, Quantity> limits = resourceRequirements.getLimits();
+ assertEquals(Double.toString(TestUtils.JOB_MANAGER_CPU), limits.get("cpu").getAmount());
+ assertEquals(
+ String.valueOf(TestUtils.JOB_MANAGER_MEMORY_MB), limits.get("memory").getAmount());
+
+ assertEquals(3, mainContainer.getVolumeMounts().size());
+ }
+
+ @Test
+ public void testAdditionalResourcesSize() throws IOException {
+ final List<HasMetadata> resultAdditionalResources = this.jmSpec.getAccompanyingResources();
+ assertEquals(3, resultAdditionalResources.size());
+
+ final List<HasMetadata> resultServices =
+ resultAdditionalResources.stream()
+ .filter(x -> x instanceof Service)
+ .collect(Collectors.toList());
+ assertEquals(2, resultServices.size());
+
+ final List<HasMetadata> resultConfigMaps =
+ resultAdditionalResources.stream()
+ .filter(x -> x instanceof ConfigMap)
+ .collect(Collectors.toList());
+ assertEquals(1, resultConfigMaps.size());
+ }
+
+ @Test
+ public void testServices() throws IOException {
+ final List<Service> resultServices =
+ this.jmSpec.getAccompanyingResources().stream()
+ .filter(x -> x instanceof Service)
+ .map(x -> (Service) x)
+ .collect(Collectors.toList());
+
+ assertEquals(2, resultServices.size());
+
+ final List<Service> internalServiceCandidates =
+ resultServices.stream()
+ .filter(
+ x ->
+ x.getMetadata()
+ .getName()
+ .equals(
+ InternalServiceDecorator
+ .getInternalServiceName(
+ CLUSTER_ID)))
+ .collect(Collectors.toList());
+ assertEquals(1, internalServiceCandidates.size());
+
+ final List<Service> restServiceCandidates =
+ resultServices.stream()
+ .filter(
+ x ->
+ x.getMetadata()
+ .getName()
+ .equals(
+ ExternalServiceDecorator
+ .getExternalServiceName(
+ CLUSTER_ID)))
+ .collect(Collectors.toList());
+ assertEquals(1, restServiceCandidates.size());
+
+ final Service resultInternalService = internalServiceCandidates.get(0);
+ assertEquals(2, resultInternalService.getMetadata().getLabels().size());
+
+ assertNull(resultInternalService.getSpec().getType());
+ assertEquals(
+ HeadlessClusterIPService.HEADLESS_CLUSTER_IP,
+ resultInternalService.getSpec().getClusterIP());
+ assertEquals(2, resultInternalService.getSpec().getPorts().size());
+ assertEquals(3, resultInternalService.getSpec().getSelector().size());
+
+ final Service resultRestService = restServiceCandidates.get(0);
+ assertEquals(2, resultRestService.getMetadata().getLabels().size());
+
+ assertEquals(
+ REST_SERVICE_EXPOSED_TYPE.defaultValue().toString(),
+ resultRestService.getSpec().getType());
+ assertEquals(1, resultRestService.getSpec().getPorts().size());
+ assertEquals(3, resultRestService.getSpec().getSelector().size());
+ }
+
+ @Test
+ public void testFlinkConfConfigMap() throws IOException {
+ final ConfigMap resultConfigMap =
+ (ConfigMap)
+ jmSpec.getAccompanyingResources().stream()
+ .filter(
+ x ->
+ x instanceof ConfigMap
+ && x.getMetadata()
+ .getName()
+ .equals(
+ FlinkConfMountDecorator
+ .getFlinkConfConfigMapName(
+ CLUSTER_ID)))
+ .collect(Collectors.toList())
+ .get(0);
+
+ assertEquals(2, resultConfigMap.getMetadata().getLabels().size());
+
+ final Map<String, String> resultData = resultConfigMap.getData();
+ assertEquals(1, resultData.size());
+ }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
new file mode 100644
index 00000000..8c8d4e62
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactoryTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.kubeclient.factory;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.ParametersTestBase;
+import org.apache.flink.kubernetes.operator.kubeclient.parameters.StandaloneKubernetesTaskManagerParameters;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** @link StandaloneKubernetesJobManagerFactory unit tests */
+public class StandaloneKubernetesTaskManagerFactoryTest extends ParametersTestBase {
+
+ private Deployment deployment;
+
+ @BeforeEach
+ public void setup() {
+ setupFlinkConfig();
+ FlinkPod podTemplate = createPodTemplate();
+ StandaloneKubernetesTaskManagerParameters tmParameters =
+ new StandaloneKubernetesTaskManagerParameters(
+ flinkConfig, TestUtils.createClusterSpecification());
+ deployment =
+ StandaloneKubernetesTaskManagerFactory.buildKubernetesTaskManagerDeployment(
+ podTemplate, tmParameters);
+ }
+
+ @Test
+ public void testDeploymentMetadata() {
+ assertEquals(
+ StandaloneKubernetesUtils.getTaskManagerDeploymentName(TestUtils.CLUSTER_ID),
+ deployment.getMetadata().getName());
+
+ final Map<String, String> expectedLabels =
+ new HashMap<>(
+ StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+ expectedLabels.putAll(userLabels);
+ assertEquals(expectedLabels, deployment.getMetadata().getLabels());
+
+ assertEquals(userAnnotations, deployment.getMetadata().getAnnotations());
+ }
+
+ @Test
+ public void testDeploymentSpec() {
+ assertEquals(1, deployment.getSpec().getReplicas());
+ assertEquals(
+ StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID),
+ deployment.getSpec().getSelector().getMatchLabels());
+ }
+
+ @Test
+ public void testTemplateMetadata() {
+ final ObjectMeta podMetadata = deployment.getSpec().getTemplate().getMetadata();
+
+ final Map<String, String> expectedLabels =
+ new HashMap<>(
+ StandaloneKubernetesUtils.getTaskManagerSelectors(TestUtils.CLUSTER_ID));
+ expectedLabels.putAll(userLabels);
+ expectedLabels.putAll(templateLabels);
+ assertEquals(expectedLabels, podMetadata.getLabels());
+
+ final Map<String, String> expectedAnnotations = new HashMap<>(userAnnotations);
+ expectedAnnotations.putAll(templateAnnotations);
+ assertEquals(expectedAnnotations, podMetadata.getAnnotations());
+ }
+
+ @Test
+ public void testTemplateSpec() {
+ final PodSpec podSpec = deployment.getSpec().getTemplate().getSpec();
+
+ assertEquals(1, podSpec.getContainers().size());
+ assertEquals(TestUtils.SERVICE_ACCOUNT, podSpec.getServiceAccountName());
+ // Config and secret volumes
+ assertEquals(2, podSpec.getVolumes().size());
+
+ final Container mainContainer = podSpec.getContainers().get(0);
+ assertEquals(Constants.MAIN_CONTAINER_NAME, mainContainer.getName());
+ assertEquals(TestUtils.IMAGE, mainContainer.getImage());
+ assertEquals(TestUtils.IMAGE_POLICY, mainContainer.getImagePullPolicy());
+
+ final Map<String, String> envs = new HashMap<>();
+ mainContainer.getEnv().forEach(env -> envs.put(env.getName(), env.getValue()));
+
+ assertEquals(templateEnvs, envs);
+
+ final List<ContainerPort> expectedContainerPorts =
+ Arrays.asList(
+ new ContainerPortBuilder()
+ .withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
+ .withContainerPort(Constants.TASK_MANAGER_RPC_PORT)
+ .build(),
+ new ContainerPortBuilder()
+ .withName(TEMPLATE_PORT_NAME)
+ .withContainerPort(TEMPLATE_PORT)
+ .build());
+
+ assertThat(mainContainer.getPorts(), containsInAnyOrder(expectedContainerPorts.toArray()));
+
+ final ResourceRequirements resourceRequirements = mainContainer.getResources();
+
+ final Map<String, Quantity> requests = resourceRequirements.getRequests();
+ assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), requests.get("cpu").getAmount());
+ assertEquals(
+ String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB),
+ requests.get("memory").getAmount());
+
+ final Map<String, Quantity> limits = resourceRequirements.getLimits();
+ assertEquals(Double.toString(TestUtils.TASK_MANAGER_CPU), limits.get("cpu").getAmount());
+ assertEquals(
+ String.valueOf(TestUtils.TASK_MANAGER_MEMORY_MB), limits.get("memory").getAmount());
+
+ assertEquals(2, mainContainer.getVolumeMounts().size());
+ }
+}
diff --git a/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
new file mode 100644
index 00000000..8aae921e
--- /dev/null
+++ b/flink-kubernetes-standalone/src/test/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptorTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.standalone;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
+import org.apache.flink.kubernetes.operator.kubeclient.utils.TestUtils;
+import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.util.concurrent.Executors;
+
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** @link KubernetesStandaloneClusterDescriptor unit tests */
+@EnableKubernetesMockClient(crud = true)
+public class KubernetesStandaloneClusterDescriptorTest {
+
+ private KubernetesStandaloneClusterDescriptor clusterDescriptor;
+ KubernetesMockServer mockServer;
+ private NamespacedKubernetesClient kubernetesClient;
+ private FlinkStandaloneKubeClient flinkKubeClient;
+ private Configuration flinkConfig = new Configuration();
+
+ @BeforeEach
+ public void setup() {
+ flinkConfig = TestUtils.createTestFlinkConfig();
+ kubernetesClient = mockServer.createClient().inNamespace(TestUtils.TEST_NAMESPACE);
+ flinkKubeClient =
+ new Fabric8FlinkStandaloneKubeClient(
+ flinkConfig, kubernetesClient, Executors.newDirectExecutorService());
+
+ clusterDescriptor = new KubernetesStandaloneClusterDescriptor(flinkConfig, flinkKubeClient);
+ }
+
+ @Test
+ public void testDeploySessionCluster() throws Exception {
+ ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+
+ flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0));
+ flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0));
+ flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0));
+
+ ClusterClientProvider clusterClientProvider =
+ clusterDescriptor.deploySessionCluster(clusterSpecification);
+
+ List<Deployment> deployments =
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(TestUtils.TEST_NAMESPACE)
+ .list()
+ .getItems();
+ String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+ String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager";
+
+ assertEquals(2, deployments.size());
+ assertThat(
+ deployments.stream()
+ .map(d -> d.getMetadata().getName())
+ .collect(Collectors.toList()),
+ containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName));
+ assertEquals(
+ flinkConfig.get(BlobServerOptions.PORT),
+ String.valueOf(Constants.BLOB_SERVER_PORT));
+ assertEquals(
+ flinkConfig.get(TaskManagerOptions.RPC_PORT),
+ String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+ assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT));
+
+ Deployment jmDeployment =
+ deployments.stream()
+ .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName))
+ .findFirst()
+ .orElse(null);
+ assertTrue(
+ jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+ .anyMatch(c -> c.getArgs().contains("jobmanager")));
+
+ ClusterClient clusterClient = clusterClientProvider.getClusterClient();
+
+ String expectedWebUrl =
+ String.format(
+ "http://%s:%d",
+ ExternalServiceDecorator.getNamespacedExternalServiceName(
+ TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE),
+ Constants.REST_PORT);
+ assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
+ }
+
+ @Test
+ public void testDeployApplicationCluster() throws Exception {
+ ClusterSpecification clusterSpecification = TestUtils.createClusterSpecification();
+
+ flinkConfig.setString(BlobServerOptions.PORT, String.valueOf(0));
+ flinkConfig.setString(TaskManagerOptions.RPC_PORT, String.valueOf(0));
+ flinkConfig.setString(RestOptions.BIND_PORT, String.valueOf(0));
+
+ ClusterClientProvider clusterClientProvider =
+ clusterDescriptor.deployApplicationCluster(
+ clusterSpecification,
+ ApplicationConfiguration.fromConfiguration(flinkConfig));
+
+ List<Deployment> deployments =
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(TestUtils.TEST_NAMESPACE)
+ .list()
+ .getItems();
+ String expectedJMDeploymentName = TestUtils.CLUSTER_ID;
+ String expectedTMDeploymentName = TestUtils.CLUSTER_ID + "-taskmanager";
+
+ assertEquals(2, deployments.size());
+ assertThat(
+ deployments.stream()
+ .map(d -> d.getMetadata().getName())
+ .collect(Collectors.toList()),
+ containsInAnyOrder(expectedJMDeploymentName, expectedTMDeploymentName));
+ assertEquals(
+ flinkConfig.get(BlobServerOptions.PORT),
+ String.valueOf(Constants.BLOB_SERVER_PORT));
+ assertEquals(
+ flinkConfig.get(TaskManagerOptions.RPC_PORT),
+ String.valueOf(Constants.TASK_MANAGER_RPC_PORT));
+ assertEquals(flinkConfig.get(RestOptions.BIND_PORT), String.valueOf(Constants.REST_PORT));
+
+ Deployment jmDeployment =
+ deployments.stream()
+ .filter(d -> d.getMetadata().getName().equals(expectedJMDeploymentName))
+ .findFirst()
+ .orElse(null);
+ assertTrue(
+ jmDeployment.getSpec().getTemplate().getSpec().getContainers().stream()
+ .anyMatch(c -> c.getArgs().contains("standalone-job")));
+
+ ClusterClient clusterClient = clusterClientProvider.getClusterClient();
+
+ String expectedWebUrl =
+ String.format(
+ "http://%s:%d",
+ ExternalServiceDecorator.getNamespacedExternalServiceName(
+ TestUtils.CLUSTER_ID, TestUtils.TEST_NAMESPACE),
+ Constants.REST_PORT);
+ assertEquals(expectedWebUrl, clusterClient.getWebInterfaceURL());
+ }
+}