You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/22 07:42:02 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cdfc7a2  [FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
cdfc7a2 is described below

commit cdfc7a21efcd0b1a7e1bd1c4f430fa7d1210f4ac
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Feb 21 13:53:13 2022 -0800

    [FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
    
    Closes #10
---
 e2e-tests/data/cr.yaml                                    |  4 ++--
 examples/basic-checkpoint-ha.yaml                         |  2 +-
 examples/basic-ingress.yaml                               |  4 ++--
 examples/basic-session.yaml                               |  4 ++--
 examples/basic.yaml                                       |  4 ++--
 examples/pod-template.yaml                                |  3 ++-
 .../kubernetes/operator/crd/spec/FlinkDeploymentSpec.java |  1 +
 .../kubernetes/operator/crd/spec/TaskManagerSpec.java     |  1 -
 .../kubernetes/operator/utils/FlinkConfigBuilder.java     | 13 +++++++++----
 .../flink/kubernetes/operator/FlinkOperatorITCase.java    |  7 +------
 .../org/apache/flink/kubernetes/operator/TestUtils.java   |  9 ++++-----
 .../kubernetes/operator/utils/FlinkConfigBuilderTest.java | 15 ++++++++++-----
 12 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index cb04df1..936a1de 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -25,11 +25,12 @@ spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
   flinkConfiguration:
-    kubernetes.service-account: flink-operator
+    taskmanager.numberOfTaskSlots: "2"
     high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
     high-availability.storageDir: file:///opt/flink/volume/flink-ha
     state.checkpoints.dir: file:///opt/flink/volume/flink-cp
     state.savepoints.dir: file:///opt/flink/volume/flink-sp
+  serviceAccount: flink-operator
   podTemplate:
     apiVersion: v1
     kind: Pod
@@ -70,7 +71,6 @@ spec:
       memory: "1024m"
       cpu: 0.5
   taskManager:
-    taskSlots: 2
     resource:
       memory: "1024m"
       cpu: 0.5
diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml
index d8172b4..42c137a 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -25,6 +25,7 @@ spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
   flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
     state.savepoints.dir: file:///flink-data/savepoints
     high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
     high-availability.storageDir: file:///flink-data/ha
@@ -34,7 +35,6 @@ spec:
       memory: "2048m"
       cpu: 1
   taskManager:
-    taskSlots: 2
     resource:
       memory: "2048m"
       cpu: 1
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index af058ec..5be18cd 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -28,14 +28,14 @@ spec:
   flinkConfiguration:
 #    rest.address: basic-example.flink.k8s.io
 #    rest.port: "80"
-    kubernetes.jobmanager.service-account: flink-operator
+    taskmanager.numberOfTaskSlots: "2"
+  serviceAccount: flink-operator
   jobManager:
     replicas: 1
     resource:
       memory: "2048m"
       cpu: 1
   taskManager:
-    taskSlots: 2
     resource:
       memory: "2048m"
       cpu: 1
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
index f956100..613b411 100644
--- a/examples/basic-session.yaml
+++ b/examples/basic-session.yaml
@@ -25,14 +25,14 @@ spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
   flinkConfiguration:
-    kubernetes.jobmanager.service-account: flink-operator
+    taskmanager.numberOfTaskSlots: "2"
+  serviceAccount: flink-operator
   jobManager:
     replicas: 1
     resource:
       memory: "2048m"
       cpu: 1
   taskManager:
-    taskSlots: 2
     resource:
       memory: "2048m"
       cpu: 1
diff --git a/examples/basic.yaml b/examples/basic.yaml
index ce1983c..3e6c0e3 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -25,14 +25,14 @@ spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
   flinkConfiguration:
-    kubernetes.jobmanager.service-account: flink-operator
+    taskmanager.numberOfTaskSlots: "2"
+  serviceAccount: flink-operator
   jobManager:
     replicas: 1
     resource:
       memory: "2048m"
       cpu: 1
   taskManager:
-    taskSlots: 2
     resource:
       memory: "2048m"
       cpu: 1
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 4807639..3e40413 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -24,6 +24,8 @@ metadata:
 spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
   podTemplate:
     apiVersion: v1
     kind: Pod
@@ -53,7 +55,6 @@ spec:
       memory: "2048m"
       cpu: 1
   taskManager:
-    taskSlots: 2
     resource:
       memory: "2048m"
       cpu: 1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 8abaf5c..933b3a1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -33,6 +33,7 @@ import java.util.Map;
 public class FlinkDeploymentSpec {
     private String image;
     private String imagePullPolicy;
+    private String serviceAccount;
     private String flinkVersion;
     private String ingressDomain;
     private Map<String, String> flinkConfiguration;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 8b25127..a440bf2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -28,6 +28,5 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor
 public class TaskManagerSpec {
     private Resource resource;
-    private int taskSlots;
     private Pod podTemplate;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 7bc4b41..eb54998 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -102,6 +102,14 @@ public class FlinkConfigBuilder {
         return this;
     }
 
+    public FlinkConfigBuilder applyServiceAccount() {
+        if (spec.getServiceAccount() != null) {
+            effectiveConfig.set(
+                    KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, spec.getServiceAccount());
+        }
+        return this;
+    }
+
     public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
         if (spec.getJobManager() != null) {
             if (spec.getJobManager() != null) {
@@ -124,10 +132,6 @@ public class FlinkConfigBuilder {
                     spec.getTaskManager().getPodTemplate(),
                     effectiveConfig,
                     false);
-            if (spec.getTaskManager().getTaskSlots() > 0) {
-                effectiveConfig.set(
-                        TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
-            }
         }
         return this;
     }
@@ -166,6 +170,7 @@ public class FlinkConfigBuilder {
                 .applyFlinkConfiguration()
                 .applyImage()
                 .applyImagePullPolicy()
+                .applyServiceAccount()
                 .applyCommonPodTemplate()
                 .applyIngressDomain()
                 .applyJobManagerSpec()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index a2f04a2..2be5ea6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -38,9 +38,6 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.CoreMatchers.is;
@@ -110,9 +107,7 @@ public class FlinkOperatorITCase {
         FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
         spec.setImage(IMAGE);
         spec.setFlinkVersion(FLINK_VERSION);
-        Map config = new HashMap<String, String>();
-        config.put("kubernetes.jobmanager.service-account", SERVICE_ACCOUNT);
-        spec.setFlinkConfiguration(config);
+        spec.setServiceAccount(SERVICE_ACCOUNT);
         Resource resource = new Resource();
         resource.setMemory("2048m");
         resource.setCpu(1);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index eeee351..601f160 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.kubernetes.operator;
 
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -86,13 +86,12 @@ public class TestUtils {
         return FlinkDeploymentSpec.builder()
                 .image(IMAGE)
                 .imagePullPolicy(IMAGE_POLICY)
+                .serviceAccount(SERVICE_ACCOUNT)
                 .flinkVersion(FLINK_VERSION)
                 .flinkConfiguration(
-                        Collections.singletonMap(
-                                KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
-                                SERVICE_ACCOUNT))
+                        Collections.singletonMap(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"))
                 .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
-                .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+                .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), null))
                 .build();
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 435585a..eb43bfc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -91,9 +91,7 @@ public class FlinkConfigBuilderTest {
     public void testApplyFlinkConfiguration() {
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
-        Assert.assertEquals(
-                SERVICE_ACCOUNT,
-                configuration.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT));
+        Assert.assertEquals(2, (int) configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
     }
 
     @Test
@@ -126,6 +124,15 @@ public class FlinkConfigBuilderTest {
     }
 
     @Test
+    public void testApplyServiceAccount() {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment).applyServiceAccount().build();
+        Assert.assertEquals(
+                SERVICE_ACCOUNT,
+                configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
+    }
+
+    @Test
     public void testApplyJobManagerSpec() throws Exception {
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build();
@@ -158,8 +165,6 @@ public class FlinkConfigBuilderTest {
                 configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
         Assert.assertEquals(
                 Double.valueOf(1), configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
-        Assert.assertEquals(
-                Integer.valueOf(2), configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
         Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
     }