You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/22 07:42:02 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new cdfc7a2 [FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
cdfc7a2 is described below
commit cdfc7a21efcd0b1a7e1bd1c4f430fa7d1210f4ac
Author: Thomas Weise <th...@apache.org>
AuthorDate: Mon Feb 21 13:53:13 2022 -0800
[FLINK-26290] Introduce serviceAccount as direct field, remove taskSlots
Closes #10
---
e2e-tests/data/cr.yaml | 4 ++--
examples/basic-checkpoint-ha.yaml | 2 +-
examples/basic-ingress.yaml | 4 ++--
examples/basic-session.yaml | 4 ++--
examples/basic.yaml | 4 ++--
examples/pod-template.yaml | 3 ++-
.../kubernetes/operator/crd/spec/FlinkDeploymentSpec.java | 1 +
.../kubernetes/operator/crd/spec/TaskManagerSpec.java | 1 -
.../kubernetes/operator/utils/FlinkConfigBuilder.java | 13 +++++++++----
.../flink/kubernetes/operator/FlinkOperatorITCase.java | 7 +------
.../org/apache/flink/kubernetes/operator/TestUtils.java | 9 ++++-----
.../kubernetes/operator/utils/FlinkConfigBuilderTest.java | 15 ++++++++++-----
12 files changed, 36 insertions(+), 31 deletions(-)
diff --git a/e2e-tests/data/cr.yaml b/e2e-tests/data/cr.yaml
index cb04df1..936a1de 100644
--- a/e2e-tests/data/cr.yaml
+++ b/e2e-tests/data/cr.yaml
@@ -25,11 +25,12 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
- kubernetes.service-account: flink-operator
+ taskmanager.numberOfTaskSlots: "2"
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
+ serviceAccount: flink-operator
podTemplate:
apiVersion: v1
kind: Pod
@@ -70,7 +71,6 @@ spec:
memory: "1024m"
cpu: 0.5
taskManager:
- taskSlots: 2
resource:
memory: "1024m"
cpu: 0.5
diff --git a/examples/basic-checkpoint-ha.yaml b/examples/basic-checkpoint-ha.yaml
index d8172b4..42c137a 100644
--- a/examples/basic-checkpoint-ha.yaml
+++ b/examples/basic-checkpoint-ha.yaml
@@ -25,6 +25,7 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
@@ -34,7 +35,6 @@ spec:
memory: "2048m"
cpu: 1
taskManager:
- taskSlots: 2
resource:
memory: "2048m"
cpu: 1
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index af058ec..5be18cd 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -28,14 +28,14 @@ spec:
flinkConfiguration:
# rest.address: basic-example.flink.k8s.io
# rest.port: "80"
- kubernetes.jobmanager.service-account: flink-operator
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
- taskSlots: 2
resource:
memory: "2048m"
cpu: 1
diff --git a/examples/basic-session.yaml b/examples/basic-session.yaml
index f956100..613b411 100644
--- a/examples/basic-session.yaml
+++ b/examples/basic-session.yaml
@@ -25,14 +25,14 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
- kubernetes.jobmanager.service-account: flink-operator
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
- taskSlots: 2
resource:
memory: "2048m"
cpu: 1
diff --git a/examples/basic.yaml b/examples/basic.yaml
index ce1983c..3e6c0e3 100644
--- a/examples/basic.yaml
+++ b/examples/basic.yaml
@@ -25,14 +25,14 @@ spec:
image: flink:1.14.3
flinkVersion: 1.14.3
flinkConfiguration:
- kubernetes.jobmanager.service-account: flink-operator
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink-operator
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
- taskSlots: 2
resource:
memory: "2048m"
cpu: 1
diff --git a/examples/pod-template.yaml b/examples/pod-template.yaml
index 4807639..3e40413 100644
--- a/examples/pod-template.yaml
+++ b/examples/pod-template.yaml
@@ -24,6 +24,8 @@ metadata:
spec:
image: flink:1.14.3
flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
podTemplate:
apiVersion: v1
kind: Pod
@@ -53,7 +55,6 @@ spec:
memory: "2048m"
cpu: 1
taskManager:
- taskSlots: 2
resource:
memory: "2048m"
cpu: 1
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 8abaf5c..933b3a1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -33,6 +33,7 @@ import java.util.Map;
public class FlinkDeploymentSpec {
private String image;
private String imagePullPolicy;
+ private String serviceAccount;
private String flinkVersion;
private String ingressDomain;
private Map<String, String> flinkConfiguration;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
index 8b25127..a440bf2 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/TaskManagerSpec.java
@@ -28,6 +28,5 @@ import lombok.NoArgsConstructor;
@AllArgsConstructor
public class TaskManagerSpec {
private Resource resource;
- private int taskSlots;
private Pod podTemplate;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 7bc4b41..eb54998 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -102,6 +102,14 @@ public class FlinkConfigBuilder {
return this;
}
+ public FlinkConfigBuilder applyServiceAccount() {
+ if (spec.getServiceAccount() != null) {
+ effectiveConfig.set(
+ KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, spec.getServiceAccount());
+ }
+ return this;
+ }
+
public FlinkConfigBuilder applyJobManagerSpec() throws IOException {
if (spec.getJobManager() != null) {
if (spec.getJobManager() != null) {
@@ -124,10 +132,6 @@ public class FlinkConfigBuilder {
spec.getTaskManager().getPodTemplate(),
effectiveConfig,
false);
- if (spec.getTaskManager().getTaskSlots() > 0) {
- effectiveConfig.set(
- TaskManagerOptions.NUM_TASK_SLOTS, spec.getTaskManager().getTaskSlots());
- }
}
return this;
}
@@ -166,6 +170,7 @@ public class FlinkConfigBuilder {
.applyFlinkConfiguration()
.applyImage()
.applyImagePullPolicy()
+ .applyServiceAccount()
.applyCommonPodTemplate()
.applyIngressDomain()
.applyJobManagerSpec()
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
index a2f04a2..2be5ea6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorITCase.java
@@ -38,9 +38,6 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
@@ -110,9 +107,7 @@ public class FlinkOperatorITCase {
FlinkDeploymentSpec spec = new FlinkDeploymentSpec();
spec.setImage(IMAGE);
spec.setFlinkVersion(FLINK_VERSION);
- Map config = new HashMap<String, String>();
- config.put("kubernetes.jobmanager.service-account", SERVICE_ACCOUNT);
- spec.setFlinkConfiguration(config);
+ spec.setServiceAccount(SERVICE_ACCOUNT);
Resource resource = new Resource();
resource.setMemory("2048m");
resource.setCpu(1);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index eeee351..601f160 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -17,7 +17,7 @@
package org.apache.flink.kubernetes.operator;
-import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -86,13 +86,12 @@ public class TestUtils {
return FlinkDeploymentSpec.builder()
.image(IMAGE)
.imagePullPolicy(IMAGE_POLICY)
+ .serviceAccount(SERVICE_ACCOUNT)
.flinkVersion(FLINK_VERSION)
.flinkConfiguration(
- Collections.singletonMap(
- KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT.key(),
- SERVICE_ACCOUNT))
+ Collections.singletonMap(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"))
.jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, null))
- .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 2, null))
+ .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), null))
.build();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index 435585a..eb43bfc 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -91,9 +91,7 @@ public class FlinkConfigBuilderTest {
public void testApplyFlinkConfiguration() {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment).applyFlinkConfiguration().build();
- Assert.assertEquals(
- SERVICE_ACCOUNT,
- configuration.get(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT));
+ Assert.assertEquals(2, (int) configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
}
@Test
@@ -126,6 +124,15 @@ public class FlinkConfigBuilderTest {
}
@Test
+ public void testApplyServiceAccount() {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment).applyServiceAccount().build();
+ Assert.assertEquals(
+ SERVICE_ACCOUNT,
+ configuration.get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
+ }
+
+ @Test
public void testApplyJobManagerSpec() throws Exception {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment).applyJobManagerSpec().build();
@@ -158,8 +165,6 @@ public class FlinkConfigBuilderTest {
configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
Double.valueOf(1), configuration.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
- Assert.assertEquals(
- Integer.valueOf(2), configuration.get(TaskManagerOptions.NUM_TASK_SLOTS));
Assert.assertEquals("pod2 api version", tmPod.getApiVersion());
}