You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/22 06:26:41 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 29ef2f5 [FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces
29ef2f5 is described below
commit 29ef2f552a8a206c88ad967120a360639b778d44
Author: bgeng777 <ge...@alibaba-inc.com>
AuthorDate: Thu Feb 17 17:41:28 2022 +0800
[FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces
Closes #9
---
.../operator/utils/FlinkConfigBuilder.java | 212 +++++++++++++++++++++
.../kubernetes/operator/utils/FlinkUtils.java | 127 +-----------
.../flink/kubernetes/operator/TestUtils.java | 48 +++--
.../operator/utils/FlinkConfigBuilderTest.java | 187 ++++++++++++++++++
.../kubernetes/operator/utils/FlinkUtilsTest.java | 27 +--
5 files changed, 441 insertions(+), 160 deletions(-)
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
new file mode 100644
index 0000000..7bc4b41
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.util.StringUtils;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.internal.SerializationUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.Collections;
+
+import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
+
+/** Builder to get effective flink config from {@link FlinkDeployment}. */
+public class FlinkConfigBuilder {
+ private final FlinkDeployment deploy;
+ private final FlinkDeploymentSpec spec;
+ private final Configuration effectiveConfig;
+
+ public FlinkConfigBuilder(FlinkDeployment deploy) {
+ this.deploy = deploy;
+ this.spec = this.deploy.getSpec();
+ this.effectiveConfig =
+ FlinkUtils.loadConfiguration(
+ System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR));
+ }
+
+ public FlinkConfigBuilder applyImage() {
+ if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
+ effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyImagePullPolicy() {
+ if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
+ effectiveConfig.set(
+ KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
+ KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyFlinkConfiguration() {
+ // Parse config from spec's flinkConfiguration
+ if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
+ spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
+ if (spec.getPodTemplate() != null) {
+ effectiveConfig.set(
+ KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE,
+ createTempFile(spec.getPodTemplate()));
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyIngressDomain() {
+ // Web UI
+ if (spec.getIngressDomain() != null) {
+ effectiveConfig.set(
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
+ if (spec.getJobManager() != null) {
+ if (spec.getJobManager() != null) {
+ setResource(spec.getJobManager().getResource(), effectiveConfig, true);
+ setPodTemplate(
+ spec.getPodTemplate(),
+ spec.getJobManager().getPodTemplate(),
+ effectiveConfig,
+ true);
+ }
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+ if (spec.getTaskManager() != null) {
+ setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
+ setPodTemplate(
+ spec.getPodTemplate(),
+ spec.getTaskManager().getPodTemplate(),
+ effectiveConfig,
+ false);
+ if (spec.getTaskManager().getTaskSlots() > 0) {
+ effectiveConfig.set(
+ TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
+ }
+ }
+ return this;
+ }
+
+ public FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
+ if (spec.getJob() != null) {
+ effectiveConfig.set(
+ DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+ final URI uri = new URI(spec.getJob().getJarURI());
+ effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
+
+ if (spec.getJob().getParallelism() > 0) {
+ effectiveConfig.set(
+ CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
+ }
+ } else {
+ effectiveConfig.set(
+ DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
+ }
+ return this;
+ }
+
+ public Configuration build() {
+
+ // Set cluster config
+ final String namespace = deploy.getMetadata().getNamespace();
+ final String clusterId = deploy.getMetadata().getName();
+ effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
+ effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
+ return effectiveConfig;
+ }
+
+ public static Configuration buildFrom(FlinkDeployment dep)
+ throws IOException, URISyntaxException {
+ return new FlinkConfigBuilder(dep)
+ .applyFlinkConfiguration()
+ .applyImage()
+ .applyImagePullPolicy()
+ .applyCommonPodTemplate()
+ .applyIngressDomain()
+ .applyJobManagerSpec()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+ }
+
+ private static void setResource(
+ Resource resource, Configuration effectiveConfig, boolean isJM) {
+ if (resource != null) {
+ final ConfigOption<MemorySize> memoryConfigOption =
+ isJM
+ ? JobManagerOptions.TOTAL_PROCESS_MEMORY
+ : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
+ final ConfigOption<Double> cpuConfigOption =
+ isJM
+ ? KubernetesConfigOptions.JOB_MANAGER_CPU
+ : KubernetesConfigOptions.TASK_MANAGER_CPU;
+ effectiveConfig.setString(memoryConfigOption.key(), resource.getMemory());
+ effectiveConfig.setDouble(cpuConfigOption.key(), resource.getCpu());
+ }
+ }
+
+ private static void setPodTemplate(
+ Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+ throws IOException {
+ if (basicPod != null) {
+ final ConfigOption<String> podConfigOption =
+ isJM
+ ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+ : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+ effectiveConfig.setString(
+ podConfigOption, createTempFile(mergePodTemplates(basicPod, appendPod)));
+ }
+ }
+
+ private static String createTempFile(Pod podTemplate) throws IOException {
+ final File tmp = File.createTempFile("podTemplate_", ".yaml");
+ Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
+ tmp.deleteOnExit();
+ return tmp.getAbsolutePath();
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index dfcc2ff..5c5e71b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,19 +17,9 @@
package org.apache.flink.kubernetes.operator.utils;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,15 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.internal.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.Collections;
import java.util.Iterator;
/** Flink Utility methods used by the operator. */
@@ -55,109 +39,9 @@ public class FlinkUtils {
private static final ObjectMapper MAPPER = new ObjectMapper();
public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
- String namespace = flinkApp.getMetadata().getNamespace();
- String clusterId = flinkApp.getMetadata().getName();
- FlinkDeploymentSpec spec = flinkApp.getSpec();
-
try {
- String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
- Configuration effectiveConfig = loadConfiguration(flinkConfDir);
-
- effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
- effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-
- if (spec.getIngressDomain() != null) {
- effectiveConfig.set(
- KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
- KubernetesConfigOptions.ServiceExposedType.ClusterIP);
- }
-
- if (spec.getJob() != null) {
- effectiveConfig.set(
- DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
- } else {
- effectiveConfig.set(
- DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
- }
-
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
- effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
- }
-
- if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
- effectiveConfig.set(
- KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
- KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
- }
-
- if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
- spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
- }
-
- // Pod template
- if (spec.getPodTemplate() != null) {
- effectiveConfig.set(
- KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE,
- createTempFile(spec.getPodTemplate()));
- }
-
- if (spec.getJobManager() != null) {
- if (spec.getJobManager().getResource() != null) {
- effectiveConfig.setString(
- JobManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- spec.getJobManager().getResource().getMemory());
- effectiveConfig.set(
- KubernetesConfigOptions.JOB_MANAGER_CPU,
- spec.getJobManager().getResource().getCpu());
- }
-
- if (spec.getJobManager().getPodTemplate() != null) {
- effectiveConfig.set(
- KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE,
- createTempFile(
- mergePodTemplates(
- spec.getPodTemplate(),
- spec.getJobManager().getPodTemplate())));
- }
- }
-
- if (spec.getTaskManager() != null) {
- if (spec.getTaskManager().getTaskSlots() > 0) {
- effectiveConfig.set(
- TaskManagerOptions.NUM_TASK_SLOTS,
- spec.getTaskManager().getTaskSlots());
- }
-
- if (spec.getTaskManager().getResource() != null) {
- effectiveConfig.setString(
- TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
- spec.getTaskManager().getResource().getMemory());
- effectiveConfig.set(
- KubernetesConfigOptions.TASK_MANAGER_CPU,
- spec.getTaskManager().getResource().getCpu());
- }
-
- if (spec.getTaskManager().getPodTemplate() != null) {
- effectiveConfig.set(
- KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
- createTempFile(
- mergePodTemplates(
- spec.getPodTemplate(),
- spec.getTaskManager().getPodTemplate())));
- }
- }
-
- if (spec.getJob() != null) {
- final URI uri = new URI(spec.getJob().getJarURI());
- effectiveConfig.set(
- PipelineOptions.JARS, Collections.singletonList(uri.toString()));
-
- if (spec.getJob().getParallelism() > 0) {
- effectiveConfig.set(
- CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
- }
- }
-
+ final Configuration effectiveConfig = FlinkConfigBuilder.buildFrom(flinkApp);
+ LOG.debug("Effective config: {}", effectiveConfig);
return effectiveConfig;
} catch (Exception e) {
throw new RuntimeException("Failed to load configuration", e);
@@ -172,13 +56,6 @@ public class FlinkUtils {
return configuration;
}
- private static String createTempFile(Pod podTemplate) throws IOException {
- File tmp = File.createTempFile("podTemplate_", ".yaml");
- Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
- tmp.deleteOnExit();
- return tmp.getAbsolutePath();
- }
-
public static Pod mergePodTemplates(Pod toPod, Pod fromPod) {
if (fromPod == null) {
return toPod;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 6f27b9b..eeee351 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -29,9 +29,13 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodSpec;
import java.util.Collections;
+import java.util.List;
/** Testing utilities. */
public class TestUtils {
@@ -42,6 +46,8 @@ public class TestUtils {
public static final String SERVICE_ACCOUNT = "flink-operator";
public static final String FLINK_VERSION = "latest";
public static final String IMAGE = String.format("flink:%s", FLINK_VERSION);
+ public static final String IMAGE_POLICY = "IfNotPresent";
+ public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
public static FlinkDeployment buildSessionCluster() {
FlinkDeployment deployment = new FlinkDeployment();
@@ -51,17 +57,7 @@ public class TestUtils {
.withName("test-cluster")
.withNamespace(TEST_NAMESPACE)
.build());
- deployment.setSpec(
- FlinkDeploymentSpec.builder()
- .image(IMAGE)
- .flinkVersion(FLINK_VERSION)
- .flinkConfiguration(
- Collections.singletonMap(
- KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
- SERVICE_ACCOUNT))
- .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
- .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
- .build());
+ deployment.setSpec(getTestFlinkDeploymentSpec());
return deployment;
}
@@ -69,11 +65,7 @@ public class TestUtils {
FlinkDeployment deployment = buildSessionCluster();
deployment
.getSpec()
- .setJob(
- JobSpec.builder()
- .jarURI("local:///tmp/sample.jar")
- .state(JobState.RUNNING)
- .build());
+ .setJob(JobSpec.builder().jarURI(SAMPLE_JAR).state(JobState.RUNNING).build());
return deployment;
}
@@ -89,4 +81,28 @@ public class TestUtils {
throw new IllegalStateException(e);
}
}
+
+ public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+ return FlinkDeploymentSpec.builder()
+ .image(IMAGE)
+ .imagePullPolicy(IMAGE_POLICY)
+ .flinkVersion(FLINK_VERSION)
+ .flinkConfiguration(
+ Collections.singletonMap(
+ KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
+ SERVICE_ACCOUNT))
+ .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
+ .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+ .build();
+ }
+
+ public static Pod getTestPod(String hostname, String apiVersion, List<Container> containers) {
+ final PodSpec podSpec = new PodSpec();
+ podSpec.setHostname(hostname);
+ podSpec.setContainers(containers);
+ final Pod pod = new Pod();
+ pod.setApiVersion(apiVersion);
+ pod.setSpec(podSpec);
+ return pod;
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
new file mode 100644
index 0000000..435585a
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.Pod;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
+import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
+import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
+import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+
+/** FlinkConfigBuilderTest. */
+public class FlinkConfigBuilderTest {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory());
+ private static FlinkDeployment flinkDeployment;
+
+ @BeforeAll
+ public static void prepareFlinkDeployment() {
+ flinkDeployment = TestUtils.buildApplicationCluster();
+ final Container container0 = new Container();
+ container0.setName("container0");
+ final Pod pod0 =
+ TestUtils.getTestPod(
+ "pod0 hostname", "pod0 api version", Arrays.asList(container0));
+ final Pod pod1 =
+ TestUtils.getTestPod("pod1 hostname", "pod1 api version", new ArrayList<>());
+ final Pod pod2 =
+ TestUtils.getTestPod("pod2 hostname", "pod2 api version", new ArrayList<>());
+
+ flinkDeployment.getSpec().setPodTemplate(pod0);
+ flinkDeployment.getSpec().setIngressDomain("test.com");
+ flinkDeployment.getSpec().getJobManager().setPodTemplate(pod1);
+ flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
+ flinkDeployment.getSpec().getJob().setParallelism(2);
+ }
+
+ @Test
+ public void testApplyImage() {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyImage().build();
+ Assert.assertEquals(IMAGE, configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
+ }
+
+ @Test
+ public void testApplyImagePolicy() {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyImagePullPolicy().build();
+ Assert.assertEquals(
+ IMAGE_POLICY,
+ configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
+ }
+
+ @Test
+ public void testApplyFlinkConfiguration() {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
+ Assert.assertEquals(
+ SERVICE_ACCOUNT,
+ configuration.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT));
+ }
+
+ @Test
+ public void testApplyCommonPodTemplate() throws Exception {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyCommonPodTemplate().build();
+ final Pod jmPod =
+ OBJECT_MAPPER.readValue(
+ new File(
+ configuration.getString(
+ KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
+ Pod.class);
+ final Pod tmPod =
+ OBJECT_MAPPER.readValue(
+ new File(
+ configuration.getString(
+ KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
+ Pod.class);
+ Assert.assertEquals("container0", jmPod.getSpec().getContainers().get(0).getName());
+ Assert.assertEquals("container0", tmPod.getSpec().getContainers().get(0).getName());
+ }
+
+ @Test
+ public void testApplyIngressDomain() {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyIngressDomain().build();
+ Assert.assertEquals(
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP,
+ configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+ }
+
+ @Test
+ public void testApplyJobManagerSpec() throws Exception {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build();
+ final Pod jmPod =
+ OBJECT_MAPPER.readValue(
+ new File(
+ configuration.getString(
+ KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
+ Pod.class);
+ Assert.assertEquals(
+ MemorySize.parse("2048m"),
+ configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+ Assert.assertEquals(
+ Double.valueOf(1), configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
+ Assert.assertEquals("pod1 api version", jmPod.getApiVersion());
+ }
+
+ @Test
+ public void testApplyTaskManagerSpec() throws Exception {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyTaskManagerSpec().build();
+ final Pod tmPod =
+ OBJECT_MAPPER.readValue(
+ new File(
+ configuration.getString(
+ KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
+ Pod.class);
+ Assert.assertEquals(
+ MemorySize.parse("2048m"),
+ configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+ Assert.assertEquals(
+ Double.valueOf(1), configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+ Assert.assertEquals(
+ Integer.valueOf(2), configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+ Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
+ }
+
+ @Test
+ public void testApplyJobOrSessionSpec() throws Exception {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyJobOrSessionSpec().build();
+ Assert.assertEquals(
+ KubernetesDeploymentTarget.APPLICATION.getName(),
+ configuration.get(DeploymentOptions.TARGET));
+ Assert.assertEquals(SAMPLE_JAR, configuration.get(PipelineOptions.JARS).get(0));
+ Assert.assertEquals(Integer.valueOf(2), configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+ }
+
+ @Test
+ public void testBuildFrom() throws Exception {
+ final Configuration configuration = FlinkConfigBuilder.buildFrom(flinkDeployment);
+ final String namespace = flinkDeployment.getMetadata().getNamespace();
+ final String clusterId = flinkDeployment.getMetadata().getName();
+ // Most configs have been tested by previous unit tests, thus we only verify the namespace
+ // and clusterId here.
+ Assert.assertEquals(namespace, configuration.get(KubernetesConfigOptions.NAMESPACE));
+ Assert.assertEquals(clusterId, configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index cf95292..2822e94 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -18,11 +18,10 @@
package org.apache.flink.kubernetes.operator.utils;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.kubernetes.operator.TestUtils;
+
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodSpec;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
@@ -39,23 +38,13 @@ public class FlinkUtilsTest {
Container container2 = new Container();
container2.setName("container2");
- PodSpec podSpec1 = new PodSpec();
- podSpec1.setHostname("pod1 hostname");
- podSpec1.setContainers(Arrays.asList(container2));
- Pod pod1 = new Pod();
- pod1.setApiVersion("pod1 api version");
- pod1.setSpec(podSpec1);
-
- PodSpec podSpec2 = new PodSpec();
- podSpec2.setHostname("pod2 hostname");
- podSpec2.setContainers(Arrays.asList(container1, container2));
- Pod pod2 = new Pod();
- pod2.setApiVersion("pod2 api version");
- pod2.setSpec(podSpec2);
+ Pod pod1 =
+ TestUtils.getTestPod(
+ "pod1 hostname", "pod1 api version", Arrays.asList(container2));
- ObjectMapper mapper = new ObjectMapper();
- JsonNode node1 = mapper.valueToTree(pod1);
- JsonNode node2 = mapper.valueToTree(pod2);
+ Pod pod2 =
+ TestUtils.getTestPod(
+ "pod2 hostname", "pod2 api version", Arrays.asList(container1, container2));
Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2);