You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/08 07:47:59 UTC

[flink] branch master updated: [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a4a99ba  [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
a4a99ba is described below

commit a4a99bac919d57387d54a2db80a249becf3ba680
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Fri Jun 5 16:33:37 2020 +0800

    [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
    
    DeploymentOptionsInternal#CONF_DIR is an internal option and stores the client config path. It should not be added to config map and used by JobManager pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.
    
    This closes #12501.
---
 .../decorators/FlinkConfMountDecorator.java        | 11 ++++-----
 .../parameters/AbstractKubernetesParameters.java   |  4 +++-
 .../flink/kubernetes/KubernetesTestUtils.java      | 15 +++++++++++++
 .../decorators/FlinkConfMountDecoratorTest.java    | 26 +++++++++++++++-------
 .../AbstractKubernetesParametersTest.java          | 16 +++++++++++++
 5 files changed, 58 insertions(+), 14 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
 import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
 	 * Get properties map for the cluster-side after removal of some keys.
 	 */
 	private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
-		final Map<String, String> propertiesMap = flinkConfig.toMap();
-
-		// remove kubernetes.config.file
-		propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
-		return propertiesMap;
+		final Configuration clusterSideConfig = flinkConfig.clone();
+		// Remove some configuration options that should not be taken to cluster side.
+		clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+		clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+		return clusterSideConfig.toMap();
 	}
 
 	@VisibleForTesting
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
 
 	@Override
 	public String getConfigDirectory() {
-		final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+		final String configDir = flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+			flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
 		checkNotNull(configDir);
 		return configDir;
 	}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.kubernetes;
 
+import org.apache.flink.configuration.Configuration;
+
 import org.apache.flink.shaded.guava18.com.google.common.io.Files;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class KubernetesTestUtils {
 	public static void createTemporyFile(String data, File directory, String fileName) throws IOException {
 		Files.write(data, new File(directory, fileName), StandardCharsets.UTF_8);
 	}
+
+	public static Configuration loadConfigurationFromString(String content) {
+		final Configuration configuration = new Configuration();
+		for (String line : content.split(System.lineSeparator())) {
+			final String[] splits = line.split(":");
+			if (splits.length >= 2) {
+				configuration.setString(splits[0].trim(), StringUtils.substringAfter(line, ":").trim());
+			}
+		}
+		return configuration;
+	}
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
index da5069f..05dc938 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.kubernetes.kubeclient.decorators;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.KubernetesTestUtils;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -42,9 +44,12 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 
 /**
  * General tests for the {@link FlinkConfMountDecorator}.
@@ -88,15 +93,20 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
 
 		assertEquals(Constants.API_VERSION, resultConfigMap.getApiVersion());
 
-		assertEquals(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID),
+		assertEquals(getFlinkConfConfigMapName(CLUSTER_ID),
 				resultConfigMap.getMetadata().getName());
 		assertEquals(getCommonLabels(), resultConfigMap.getMetadata().getLabels());
 
 		Map<String, String> resultDatas = resultConfigMap.getData();
 		assertEquals("some data", resultDatas.get("logback.xml"));
 		assertEquals("some data", resultDatas.get("log4j.properties"));
-		assertTrue(resultDatas.get(FLINK_CONF_FILENAME).contains(KubernetesConfigOptions.FLINK_CONF_DIR.key() +
-				": " + FLINK_CONF_DIR_IN_POD));
+
+		final Configuration resultFlinkConfig = KubernetesTestUtils.loadConfigurationFromString(
+			resultDatas.get(FLINK_CONF_FILENAME));
+		assertThat(resultFlinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR), is(FLINK_CONF_DIR_IN_POD));
+		// The following config options should not be added to config map
+		assertThat(resultFlinkConfig.get(KubernetesConfigOptions.KUBE_CONFIG_FILE), is(nullValue()));
+		assertThat(resultFlinkConfig.get(DeploymentOptionsInternal.CONF_DIR), is(nullValue()));
 	}
 
 	@Test
@@ -112,7 +122,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
 			new VolumeBuilder()
 				.withName(Constants.FLINK_CONF_VOLUME)
 				.withNewConfigMap()
-					.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+					.withName(getFlinkConfConfigMapName(CLUSTER_ID))
 					.withItems(expectedKeyToPaths)
 					.endConfigMap()
 				.build());
@@ -145,7 +155,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
 			new VolumeBuilder()
 				.withName(Constants.FLINK_CONF_VOLUME)
 				.withNewConfigMap()
-				.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+				.withName(getFlinkConfConfigMapName(CLUSTER_ID))
 				.withItems(expectedKeyToPaths)
 				.endConfigMap()
 				.build());
@@ -171,7 +181,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
 			new VolumeBuilder()
 				.withName(Constants.FLINK_CONF_VOLUME)
 				.withNewConfigMap()
-				.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+				.withName(getFlinkConfConfigMapName(CLUSTER_ID))
 				.withItems(expectedKeyToPaths)
 				.endConfigMap()
 				.build());
@@ -202,7 +212,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
 			new VolumeBuilder()
 				.withName(Constants.FLINK_CONF_VOLUME)
 				.withNewConfigMap()
-				.withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+				.withName(getFlinkConfConfigMapName(CLUSTER_ID))
 				.withItems(expectedKeyToPaths)
 				.endConfigMap()
 				.build());
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
index 371b75a..3386aa2 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.kubernetes.kubeclient.parameters;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.StringUtils;
@@ -31,6 +32,8 @@ import java.util.Map;
 import java.util.Random;
 
 import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
 /**
  * General tests for the {@link AbstractKubernetesParameters}.
@@ -62,6 +65,19 @@ public class AbstractKubernetesParametersTest extends TestLogger {
 		);
 	}
 
+	@Test
+	public void getConfigDirectory() {
+		final String confDir = "/path/of/flink-conf";
+		flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
+		assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDir));
+	}
+
+	@Test
+	public void getConfigDirectoryFallbackToPodConfDir() {
+		final String confDirInPod = flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
+		assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDirInPod));
+	}
+
 	private class TestingKubernetesParameters extends AbstractKubernetesParameters {
 
 		public TestingKubernetesParameters(Configuration flinkConfig) {