You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/22 06:26:41 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 29ef2f5  [FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces
29ef2f5 is described below

commit 29ef2f552a8a206c88ad967120a360639b778d44
Author: bgeng777 <ge...@alibaba-inc.com>
AuthorDate: Thu Feb 17 17:41:28 2022 +0800

    [FLINK-26163] Refactor FlinkUtils#getEffectiveConfig into smaller pieces
    
    Closes #9
---
 .../operator/utils/FlinkConfigBuilder.java         | 212 +++++++++++++++++++++
 .../kubernetes/operator/utils/FlinkUtils.java      | 127 +-----------
 .../flink/kubernetes/operator/TestUtils.java       |  48 +++--
 .../operator/utils/FlinkConfigBuilderTest.java     | 187 ++++++++++++++++++
 .../kubernetes/operator/utils/FlinkUtilsTest.java  |  27 +--
 5 files changed, 441 insertions(+), 160 deletions(-)

diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
new file mode 100644
index 0000000..7bc4b41
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.Resource;
+import org.apache.flink.util.StringUtils;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.internal.SerializationUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.util.Collections;
+
+import static org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
+
+/** Builder to get effective flink config from {@link FlinkDeployment}. */
+public class FlinkConfigBuilder {
+    private final FlinkDeployment deploy;
+    private final FlinkDeploymentSpec spec;
+    private final Configuration effectiveConfig;
+
+    public FlinkConfigBuilder(FlinkDeployment deploy) {
+        this.deploy = deploy;
+        this.spec = this.deploy.getSpec();
+        this.effectiveConfig =
+                FlinkUtils.loadConfiguration(
+                        System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR));
+    }
+
+    public FlinkConfigBuilder applyImage() {
+        if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
+            effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyImagePullPolicy() {
+        if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
+            effectiveConfig.set(
+                    KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
+                    KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyFlinkConfiguration() {
+        // Parse config from spec's flinkConfiguration
+        if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
+            spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
+        if (spec.getPodTemplate() != null) {
+            effectiveConfig.set(
+                    KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE,
+                    createTempFile(spec.getPodTemplate()));
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyIngressDomain() {
+        // Web UI
+        if (spec.getIngressDomain() != null) {
+            effectiveConfig.set(
+                    KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
+        if (spec.getJobManager() != null) {
+            if (spec.getJobManager() != null) {
+                setResource(spec.getJobManager().getResource(), effectiveConfig, true);
+                setPodTemplate(
+                        spec.getPodTemplate(),
+                        spec.getJobManager().getPodTemplate(),
+                        effectiveConfig,
+                        true);
+            }
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyTaskManagerSpec() throws IOException {
+        if (spec.getTaskManager() != null) {
+            setResource(spec.getTaskManager().getResource(), effectiveConfig, false);
+            setPodTemplate(
+                    spec.getPodTemplate(),
+                    spec.getTaskManager().getPodTemplate(),
+                    effectiveConfig,
+                    false);
+            if (spec.getTaskManager().getTaskSlots() > 0) {
+                effectiveConfig.set(
+                        TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
+            }
+        }
+        return this;
+    }
+
+    public FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
+        if (spec.getJob() != null) {
+            effectiveConfig.set(
+                    DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
+            final URI uri = new URI(spec.getJob().getJarURI());
+            effectiveConfig.set(PipelineOptions.JARS, Collections.singletonList(uri.toString()));
+
+            if (spec.getJob().getParallelism() > 0) {
+                effectiveConfig.set(
+                        CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
+            }
+        } else {
+            effectiveConfig.set(
+                    DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
+        }
+        return this;
+    }
+
+    public Configuration build() {
+
+        // Set cluster config
+        final String namespace = deploy.getMetadata().getNamespace();
+        final String clusterId = deploy.getMetadata().getName();
+        effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
+        effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
+        return effectiveConfig;
+    }
+
+    public static Configuration buildFrom(FlinkDeployment dep)
+            throws IOException, URISyntaxException {
+        return new FlinkConfigBuilder(dep)
+                .applyFlinkConfiguration()
+                .applyImage()
+                .applyImagePullPolicy()
+                .applyCommonPodTemplate()
+                .applyIngressDomain()
+                .applyJobManagerSpec()
+                .applyTaskManagerSpec()
+                .applyJobOrSessionSpec()
+                .build();
+    }
+
+    private static void setResource(
+            Resource resource, Configuration effectiveConfig, boolean isJM) {
+        if (resource != null) {
+            final ConfigOption<MemorySize> memoryConfigOption =
+                    isJM
+                            ? JobManagerOptions.TOTAL_PROCESS_MEMORY
+                            : TaskManagerOptions.TOTAL_PROCESS_MEMORY;
+            final ConfigOption<Double> cpuConfigOption =
+                    isJM
+                            ? KubernetesConfigOptions.JOB_MANAGER_CPU
+                            : KubernetesConfigOptions.TASK_MANAGER_CPU;
+            effectiveConfig.setString(memoryConfigOption.key(), resource.getMemory());
+            effectiveConfig.setDouble(cpuConfigOption.key(), resource.getCpu());
+        }
+    }
+
+    private static void setPodTemplate(
+            Pod basicPod, Pod appendPod, Configuration effectiveConfig, boolean isJM)
+            throws IOException {
+        if (basicPod != null) {
+            final ConfigOption<String> podConfigOption =
+                    isJM
+                            ? KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE
+                            : KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE;
+            effectiveConfig.setString(
+                    podConfigOption, createTempFile(mergePodTemplates(basicPod, appendPod)));
+        }
+    }
+
+    private static String createTempFile(Pod podTemplate) throws IOException {
+        final File tmp = File.createTempFile("podTemplate_", ".yaml");
+        Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
+        tmp.deleteOnExit();
+        return tmp.getAbsolutePath();
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index dfcc2ff..5c5e71b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -17,19 +17,9 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.PipelineOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
-import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
-import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -37,15 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.internal.SerializationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.Collections;
 import java.util.Iterator;
 
 /** Flink Utility methods used by the operator. */
@@ -55,109 +39,9 @@ public class FlinkUtils {
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
     public static Configuration getEffectiveConfig(FlinkDeployment flinkApp) {
-        String namespace = flinkApp.getMetadata().getNamespace();
-        String clusterId = flinkApp.getMetadata().getName();
-        FlinkDeploymentSpec spec = flinkApp.getSpec();
-
         try {
-            String flinkConfDir = System.getenv().get(ConfigConstants.ENV_FLINK_CONF_DIR);
-            Configuration effectiveConfig = loadConfiguration(flinkConfDir);
-
-            effectiveConfig.setString(KubernetesConfigOptions.NAMESPACE, namespace);
-            effectiveConfig.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
-
-            if (spec.getIngressDomain() != null) {
-                effectiveConfig.set(
-                        KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
-                        KubernetesConfigOptions.ServiceExposedType.ClusterIP);
-            }
-
-            if (spec.getJob() != null) {
-                effectiveConfig.set(
-                        DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
-            } else {
-                effectiveConfig.set(
-                        DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
-            }
-
-            if (!StringUtils.isNullOrWhitespaceOnly(spec.getImage())) {
-                effectiveConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, spec.getImage());
-            }
-
-            if (!StringUtils.isNullOrWhitespaceOnly(spec.getImagePullPolicy())) {
-                effectiveConfig.set(
-                        KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
-                        KubernetesConfigOptions.ImagePullPolicy.valueOf(spec.getImagePullPolicy()));
-            }
-
-            if (spec.getFlinkConfiguration() != null && !spec.getFlinkConfiguration().isEmpty()) {
-                spec.getFlinkConfiguration().forEach(effectiveConfig::setString);
-            }
-
-            // Pod template
-            if (spec.getPodTemplate() != null) {
-                effectiveConfig.set(
-                        KubernetesConfigOptions.KUBERNETES_POD_TEMPLATE,
-                        createTempFile(spec.getPodTemplate()));
-            }
-
-            if (spec.getJobManager() != null) {
-                if (spec.getJobManager().getResource() != null) {
-                    effectiveConfig.setString(
-                            JobManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                            spec.getJobManager().getResource().getMemory());
-                    effectiveConfig.set(
-                            KubernetesConfigOptions.JOB_MANAGER_CPU,
-                            spec.getJobManager().getResource().getCpu());
-                }
-
-                if (spec.getJobManager().getPodTemplate() != null) {
-                    effectiveConfig.set(
-                            KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE,
-                            createTempFile(
-                                    mergePodTemplates(
-                                            spec.getPodTemplate(),
-                                            spec.getJobManager().getPodTemplate())));
-                }
-            }
-
-            if (spec.getTaskManager() != null) {
-                if (spec.getTaskManager().getTaskSlots() > 0) {
-                    effectiveConfig.set(
-                            TaskManagerOptions.NUM_TASK_SLOTS,
-                            spec.getTaskManager().getTaskSlots());
-                }
-
-                if (spec.getTaskManager().getResource() != null) {
-                    effectiveConfig.setString(
-                            TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                            spec.getTaskManager().getResource().getMemory());
-                    effectiveConfig.set(
-                            KubernetesConfigOptions.TASK_MANAGER_CPU,
-                            spec.getTaskManager().getResource().getCpu());
-                }
-
-                if (spec.getTaskManager().getPodTemplate() != null) {
-                    effectiveConfig.set(
-                            KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
-                            createTempFile(
-                                    mergePodTemplates(
-                                            spec.getPodTemplate(),
-                                            spec.getTaskManager().getPodTemplate())));
-                }
-            }
-
-            if (spec.getJob() != null) {
-                final URI uri = new URI(spec.getJob().getJarURI());
-                effectiveConfig.set(
-                        PipelineOptions.JARS, Collections.singletonList(uri.toString()));
-
-                if (spec.getJob().getParallelism() > 0) {
-                    effectiveConfig.set(
-                            CoreOptions.DEFAULT_PARALLELISM, spec.getJob().getParallelism());
-                }
-            }
-
+            final Configuration effectiveConfig = FlinkConfigBuilder.buildFrom(flinkApp);
+            LOG.debug("Effective config: {}", effectiveConfig);
             return effectiveConfig;
         } catch (Exception e) {
             throw new RuntimeException("Failed to load configuration", e);
@@ -172,13 +56,6 @@ public class FlinkUtils {
         return configuration;
     }
 
-    private static String createTempFile(Pod podTemplate) throws IOException {
-        File tmp = File.createTempFile("podTemplate_", ".yaml");
-        Files.write(tmp.toPath(), SerializationUtils.dumpAsYaml(podTemplate).getBytes());
-        tmp.deleteOnExit();
-        return tmp.getAbsolutePath();
-    }
-
     public static Pod mergePodTemplates(Pod toPod, Pod fromPod) {
         if (fromPod == null) {
             return toPod;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 6f27b9b..eeee351 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -29,9 +29,13 @@ import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodSpec;
 
 import java.util.Collections;
+import java.util.List;
 
 /** Testing utilities. */
 public class TestUtils {
@@ -42,6 +46,8 @@ public class TestUtils {
     public static final String SERVICE_ACCOUNT = "flink-operator";
     public static final String FLINK_VERSION = "latest";
     public static final String IMAGE = String.format("flink:%s", FLINK_VERSION);
+    public static final String IMAGE_POLICY = "IfNotPresent";
+    public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
 
     public static FlinkDeployment buildSessionCluster() {
         FlinkDeployment deployment = new FlinkDeployment();
@@ -51,17 +57,7 @@ public class TestUtils {
                         .withName("test-cluster")
                         .withNamespace(TEST_NAMESPACE)
                         .build());
-        deployment.setSpec(
-                FlinkDeploymentSpec.builder()
-                        .image(IMAGE)
-                        .flinkVersion(FLINK_VERSION)
-                        .flinkConfiguration(
-                                Collections.singletonMap(
-                                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
-                                        SERVICE_ACCOUNT))
-                        .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
-                        .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
-                        .build());
+        deployment.setSpec(getTestFlinkDeploymentSpec());
         return deployment;
     }
 
@@ -69,11 +65,7 @@ public class TestUtils {
         FlinkDeployment deployment = buildSessionCluster();
         deployment
                 .getSpec()
-                .setJob(
-                        JobSpec.builder()
-                                .jarURI("local:///tmp/sample.jar")
-                                .state(JobState.RUNNING)
-                                .build());
+                .setJob(JobSpec.builder().jarURI(SAMPLE_JAR).state(JobState.RUNNING).build());
         return deployment;
     }
 
@@ -89,4 +81,28 @@ public class TestUtils {
             throw new IllegalStateException(e);
         }
     }
+
+    public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+        return FlinkDeploymentSpec.builder()
+                .image(IMAGE)
+                .imagePullPolicy(IMAGE_POLICY)
+                .flinkVersion(FLINK_VERSION)
+                .flinkConfiguration(
+                        Collections.singletonMap(
+                                KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
+                                SERVICE_ACCOUNT))
+                .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
+                .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+                .build();
+    }
+
+    public static Pod getTestPod(String hostname, String apiVersion, List<Container> containers) {
+        final PodSpec podSpec = new PodSpec();
+        podSpec.setHostname(hostname);
+        podSpec.setContainers(containers);
+        final Pod pod = new Pod();
+        pod.setApiVersion(apiVersion);
+        pod.setSpec(podSpec);
+        return pod;
+    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
new file mode 100644
index 0000000..435585a
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.Pod;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
+import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
+import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
+import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+
+/** FlinkConfigBuilderTest. */
+public class FlinkConfigBuilderTest {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory());
+    private static FlinkDeployment flinkDeployment;
+
+    @BeforeAll
+    public static void prepareFlinkDeployment() {
+        flinkDeployment = TestUtils.buildApplicationCluster();
+        final Container container0 = new Container();
+        container0.setName("container0");
+        final Pod pod0 =
+                TestUtils.getTestPod(
+                        "pod0 hostname", "pod0 api version", Arrays.asList(container0));
+        final Pod pod1 =
+                TestUtils.getTestPod("pod1 hostname", "pod1 api version", new ArrayList<>());
+        final Pod pod2 =
+                TestUtils.getTestPod("pod2 hostname", "pod2 api version", new ArrayList<>());
+
+        flinkDeployment.getSpec().setPodTemplate(pod0);
+        flinkDeployment.getSpec().setIngressDomain("test.com");
+        flinkDeployment.getSpec().getJobManager().setPodTemplate(pod1);
+        flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
+        flinkDeployment.getSpec().getJob().setParallelism(2);
+    }
+
+    @Test
+    public void testApplyImage() {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyImage().build();
+        Assert.assertEquals(IMAGE, configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE));
+    }
+
+    @Test
+    public void testApplyImagePolicy() {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyImagePullPolicy().build();
+        Assert.assertEquals(
+                IMAGE_POLICY,
+                configuration.get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY).toString());
+    }
+
+    @Test
+    public void testApplyFlinkConfiguration() {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
+        Assert.assertEquals(
+                SERVICE_ACCOUNT,
+                configuration.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT));
+    }
+
+    @Test
+    public void testApplyCommonPodTemplate() throws Exception {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyCommonPodTemplate().build();
+        final Pod jmPod =
+                OBJECT_MAPPER.readValue(
+                        new File(
+                                configuration.getString(
+                                        KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
+                        Pod.class);
+        final Pod tmPod =
+                OBJECT_MAPPER.readValue(
+                        new File(
+                                configuration.getString(
+                                        KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
+                        Pod.class);
+        Assert.assertEquals("container0", jmPod.getSpec().getContainers().get(0).getName());
+        Assert.assertEquals("container0", tmPod.getSpec().getContainers().get(0).getName());
+    }
+
+    @Test
+    public void testApplyIngressDomain() {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyIngressDomain().build();
+        Assert.assertEquals(
+                KubernetesConfigOptions.ServiceExposedType.ClusterIP,
+                configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
+    }
+
+    @Test
+    public void testApplyJobManagerSpec() throws Exception {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build();
+        final Pod jmPod =
+                OBJECT_MAPPER.readValue(
+                        new File(
+                                configuration.getString(
+                                        KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)),
+                        Pod.class);
+        Assert.assertEquals(
+                MemorySize.parse("2048m"),
+                configuration.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
+        Assert.assertEquals(
+                Double.valueOf(1), configuration.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
+        Assert.assertEquals("pod1 api version", jmPod.getApiVersion());
+    }
+
+    @Test
+    public void testApplyTaskManagerSpec() throws Exception {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyTaskManagerSpec().build();
+        final Pod tmPod =
+                OBJECT_MAPPER.readValue(
+                        new File(
+                                configuration.getString(
+                                        KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)),
+                        Pod.class);
+        Assert.assertEquals(
+                MemorySize.parse("2048m"),
+                configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
+        Assert.assertEquals(
+                Double.valueOf(1), configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
+        Assert.assertEquals(
+                Integer.valueOf(2), configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
+        Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
+    }
+
+    @Test
+    public void testApplyJobOrSessionSpec() throws Exception {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyJobOrSessionSpec().build();
+        Assert.assertEquals(
+                KubernetesDeploymentTarget.APPLICATION.getName(),
+                configuration.get(DeploymentOptions.TARGET));
+        Assert.assertEquals(SAMPLE_JAR, configuration.get(PipelineOptions.JARS).get(0));
+        Assert.assertEquals(Integer.valueOf(2), configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+    }
+
+    @Test
+    public void testBuildFrom() throws Exception {
+        final Configuration configuration = FlinkConfigBuilder.buildFrom(flinkDeployment);
+        final String namespace = flinkDeployment.getMetadata().getNamespace();
+        final String clusterId = flinkDeployment.getMetadata().getName();
+        // Most configs have been tested by previous unit tests, thus we only verify the namespace
+        // and clusterId here.
+        Assert.assertEquals(namespace, configuration.get(KubernetesConfigOptions.NAMESPACE));
+        Assert.assertEquals(clusterId, configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+    }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index cf95292..2822e94 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.kubernetes.operator.utils;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.kubernetes.operator.TestUtils;
+
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodSpec;
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
@@ -39,23 +38,13 @@ public class FlinkUtilsTest {
         Container container2 = new Container();
         container2.setName("container2");
 
-        PodSpec podSpec1 = new PodSpec();
-        podSpec1.setHostname("pod1 hostname");
-        podSpec1.setContainers(Arrays.asList(container2));
-        Pod pod1 = new Pod();
-        pod1.setApiVersion("pod1 api version");
-        pod1.setSpec(podSpec1);
-
-        PodSpec podSpec2 = new PodSpec();
-        podSpec2.setHostname("pod2 hostname");
-        podSpec2.setContainers(Arrays.asList(container1, container2));
-        Pod pod2 = new Pod();
-        pod2.setApiVersion("pod2 api version");
-        pod2.setSpec(podSpec2);
+        Pod pod1 =
+                TestUtils.getTestPod(
+                        "pod1 hostname", "pod1 api version", Arrays.asList(container2));
 
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode node1 = mapper.valueToTree(pod1);
-        JsonNode node2 = mapper.valueToTree(pod2);
+        Pod pod2 =
+                TestUtils.getTestPod(
+                        "pod2 hostname", "pod2 api version", Arrays.asList(container1, container2));
 
         Pod mergedPod = FlinkUtils.mergePodTemplates(pod1, pod2);