You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/12/25 09:05:10 UTC

[flink] 01/02: [FLINK-20664][k8s] Support setting service account for TaskManager pod.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 195d85539b7c70d2436a5e25e6fe6eb4c20f694f
Author: blublinsky <bo...@lightbend.com>
AuthorDate: Mon Dec 21 08:39:35 2020 -0600

    [FLINK-20664][k8s] Support setting service account for TaskManager pod.
---
 .../generated/kubernetes_config_configuration.html | 16 +++++-
 .../configuration/KubernetesConfigOptions.java     | 24 +++++++-
 .../decorators/InitTaskManagerDecorator.java       |  1 +
 .../parameters/KubernetesJobManagerParameters.java |  3 +-
 .../KubernetesTaskManagerParameters.java           |  6 ++
 .../InitJobManagerDecoratorAccountTest.java        | 64 ++++++++++++++++++++++
 .../decorators/InitJobManagerDecoratorTest.java    |  2 +-
 .../InitTaskManagerDecoratorAccountTest.java       | 64 ++++++++++++++++++++++
 .../decorators/InitTaskManagerDecoratorTest.java   |  7 +++
 9 files changed, 181 insertions(+), 6 deletions(-)

diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html
index 629e8d6..2bf8f1e 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -112,9 +112,9 @@
         </tr>
         <tr>
             <td><h5>kubernetes.jobmanager.service-account</h5></td>
-            <td style="word-wrap: break-word;">"default"</td>
+            <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.</td>
+            <td>Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
         </tr>
         <tr>
             <td><h5>kubernetes.jobmanager.tolerations</h5></td>
@@ -147,6 +147,12 @@
             <td>The user-specified secrets that will be mounted into Flink container. The value should be in the form of <span markdown="span">`foo:/opt/secrets-foo,bar:/opt/secrets-bar`</span>.</td>
         </tr>
         <tr>
+            <td><h5>kubernetes.service-account</h5></td>
+            <td style="word-wrap: break-word;">"default"</td>
+            <td>String</td>
+            <td>Service account that is used by jobmanager and taskmanager within kubernetes cluster. Notice that this can be overwritten by config options 'kubernetes.jobmanager.service-account' and 'kubernetes.taskmanager.service-account' for jobmanager and taskmanager respectively.</td>
+        </tr>
+        <tr>
             <td><h5>kubernetes.taskmanager.annotations</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Map</td>
@@ -171,6 +177,12 @@
             <td>The node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.</td>
         </tr>
         <tr>
+            <td><h5>kubernetes.taskmanager.service-account</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Service account that is used by taskmanager within kubernetes cluster. The task manager uses this service account when watching config maps on the API server to retrieve leader address of jobmanager and resourcemanager. If not explicitly configured, config option 'kubernetes.service-account' will be used.</td>
+        </tr>
+        <tr>
             <td><h5>kubernetes.taskmanager.tolerations</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>List&lt;Map&gt;</td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 86a7e76..4e54af8 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -38,6 +38,8 @@ import static org.apache.flink.configuration.description.TextElement.code;
 @PublicEvolving
 public class KubernetesConfigOptions {
 
+	private static final String KUBERNETES_SERVICE_ACCOUNT_KEY = "kubernetes.service-account";
+
 	public static final ConfigOption<String> CONTEXT =
 		key("kubernetes.context")
 		.stringType()
@@ -56,9 +58,27 @@ public class KubernetesConfigOptions {
 	public static final ConfigOption<String> JOB_MANAGER_SERVICE_ACCOUNT =
 		key("kubernetes.jobmanager.service-account")
 		.stringType()
-		.defaultValue("default")
+		.noDefaultValue()
 		.withDescription("Service account that is used by jobmanager within kubernetes cluster. " +
-			"The job manager uses this service account when requesting taskmanager pods from the API server.");
+			"The job manager uses this service account when requesting taskmanager pods from the API server. " +
+			"If not explicitly configured, config option '" + KUBERNETES_SERVICE_ACCOUNT_KEY + "' will be used.");
+
+	public static final ConfigOption<String> TASK_MANAGER_SERVICE_ACCOUNT =
+		key("kubernetes.taskmanager.service-account")
+		.stringType()
+		.noDefaultValue()
+		.withDescription("Service account that is used by taskmanager within kubernetes cluster. " +
+			"The task manager uses this service account when watching config maps on the API server to retrieve " +
+			"leader address of jobmanager and resourcemanager. If not explicitly configured, config option '" +
+			KUBERNETES_SERVICE_ACCOUNT_KEY + "' will be used.");
+
+	public static final ConfigOption<String> KUBERNETES_SERVICE_ACCOUNT =
+		key(KUBERNETES_SERVICE_ACCOUNT_KEY)
+			.stringType()
+			.defaultValue("default")
+			.withDescription("Service account that is used by jobmanager and taskmanager within kubernetes cluster. " +
+				"Notice that this can be overwritten by config options '" + JOB_MANAGER_SERVICE_ACCOUNT.key() +
+				"' and '" + TASK_MANAGER_SERVICE_ACCOUNT.key() + "' for jobmanager and taskmanager respectively.");
 
 	public static final ConfigOption<Double> JOB_MANAGER_CPU =
 		key("kubernetes.jobmanager.cpu")
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
index 3f40048..ee6202d 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java
@@ -58,6 +58,7 @@ public class InitTaskManagerDecorator extends AbstractKubernetesStepDecorator {
 				.withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
 				.endMetadata()
 			.editOrNewSpec()
+				.withServiceAccountName(kubernetesTaskManagerParameters.getServiceAccount())
 				.withRestartPolicy(Constants.RESTART_POLICY_OF_NEVER)
 				.withImagePullSecrets(kubernetesTaskManagerParameters.getImagePullSecrets())
 				.withNodeSelector(kubernetesTaskManagerParameters.getNodeSelector())
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index f888f0e..b9dee12 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -120,7 +120,8 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
 	}
 
 	public String getServiceAccount() {
-		return flinkConfig.getString(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT);
+		return flinkConfig.getOptional(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT)
+			.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
 	}
 
 	public String getEntrypointClass() {
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
index bd2c7a5..6100445 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java
@@ -106,6 +106,12 @@ public class KubernetesTaskManagerParameters extends AbstractKubernetesParameter
 		return containeredTaskManagerParameters.getTaskExecutorProcessSpec().getCpuCores().getValue().doubleValue();
 	}
 
+	public String getServiceAccount() {
+		return flinkConfig.getOptional(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT)
+			.orElse(flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
+
+	}
+
 	public Map<String, Long> getTaskManagerExternalResources() {
 		return taskManagerExternalResources;
 	}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorAccountTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorAccountTest.java
new file mode 100644
index 0000000..20cc6d9
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorAccountTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link InitJobManagerDecorator} decorating service account.
+ */
+public class InitJobManagerDecoratorAccountTest extends KubernetesJobManagerTestBase {
+
+	private static final String SERVICE_ACCOUNT_NAME = "service-test";
+	private static final String JOB_MANGER_SERVICE_ACCOUNT_NAME = "jm-service-test";
+
+	private Pod resultPod;
+
+	@Override
+	protected void setupFlinkConfig() {
+		super.setupFlinkConfig();
+
+		this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
+		this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, JOB_MANGER_SERVICE_ACCOUNT_NAME);
+	}
+
+	@Override
+	protected void onSetup() throws Exception {
+		super.onSetup();
+
+		final InitJobManagerDecorator initJobManagerDecorator =
+			new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
+		final FlinkPod resultFlinkPod = initJobManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
+
+		this.resultPod = resultFlinkPod.getPod();
+	}
+
+	@Test
+	public void testPodServiceAccountName() {
+		assertThat(this.resultPod.getSpec().getServiceAccountName(), is(JOB_MANGER_SERVICE_ACCOUNT_NAME));
+	}
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
index 5b64171..9256167 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java
@@ -74,7 +74,7 @@ public class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
 	protected void setupFlinkConfig() {
 		super.setupFlinkConfig();
 
-		this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
+		this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
 		this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
 		this.flinkConfig.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, ANNOTATIONS);
 		this.flinkConfig.setString(KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorAccountTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorAccountTest.java
new file mode 100644
index 0000000..f14f97b
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorAccountTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.decorators;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkPod;
+import org.apache.flink.kubernetes.kubeclient.KubernetesTaskManagerTestBase;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link InitTaskManagerDecorator} decorating service account.
+ */
+public class InitTaskManagerDecoratorAccountTest extends KubernetesTaskManagerTestBase {
+
+	private static final String SERVICE_ACCOUNT_NAME = "service-test";
+	private static final String TASK_MANAGER_SERVICE_ACCOUNT_NAME = "tm-service-test";
+
+	private Pod resultPod;
+
+	@Override
+	protected void setupFlinkConfig() {
+		super.setupFlinkConfig();
+
+		this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
+		this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_SERVICE_ACCOUNT, TASK_MANAGER_SERVICE_ACCOUNT_NAME);
+	}
+
+	@Override
+	protected void onSetup() throws Exception {
+		super.onSetup();
+
+		final InitTaskManagerDecorator initTaskManagerDecorator =
+			new InitTaskManagerDecorator(kubernetesTaskManagerParameters);
+
+		final FlinkPod resultFlinkPod = initTaskManagerDecorator.decorateFlinkPod(this.baseFlinkPod);
+		this.resultPod = resultFlinkPod.getPod();
+	}
+
+	@Test
+	public void testPodServiceAccountName() {
+		assertThat(this.resultPod.getSpec().getServiceAccountName(), is(TASK_MANAGER_SERVICE_ACCOUNT_NAME));
+	}
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
index d88db03..6fc6274 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.assertThat;
  */
 public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase {
 
+	private static final String SERVICE_ACCOUNT_NAME = "service-test";
 	private static final List<String> IMAGE_PULL_SECRETS = Arrays.asList("s1", "s2", "s3");
 	private static final Map<String, String> ANNOTATIONS = new HashMap<String, String>() {
 		{
@@ -77,6 +78,7 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
 	protected void setupFlinkConfig() {
 		super.setupFlinkConfig();
 
+		this.flinkConfig.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, SERVICE_ACCOUNT_NAME);
 		this.flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_SECRETS, IMAGE_PULL_SECRETS);
 		this.flinkConfig.set(KubernetesConfigOptions.TASK_MANAGER_ANNOTATIONS, ANNOTATIONS);
 		this.flinkConfig.setString(KubernetesConfigOptions.TASK_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
@@ -188,6 +190,11 @@ public class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase
 	}
 
 	@Test
+	public void testPodServiceAccountName() {
+		assertThat(this.resultPod.getSpec().getServiceAccountName(), is(SERVICE_ACCOUNT_NAME));
+	}
+
+	@Test
 	public void testRestartPolicy() {
 		final String resultRestartPolicy = this.resultPod.getSpec().getRestartPolicy();