You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/06/08 07:47:59 UTC
[flink] branch master updated: [FLINK-18149][k8s] Do not add
DeploymentOptionsInternal#CONF_DIR to config map
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a4a99ba [FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
a4a99ba is described below
commit a4a99bac919d57387d54a2db80a249becf3ba680
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Fri Jun 5 16:33:37 2020 +0800
[FLINK-18149][k8s] Do not add DeploymentOptionsInternal#CONF_DIR to config map
DeploymentOptionsInternal#CONF_DIR is an internal option and stores the client config path. It should not be added to config map and used by JobManager pod. Instead, KubernetesConfigOptions#FLINK_CONF_DIR will be used.
This closes #12501.
---
.../decorators/FlinkConfMountDecorator.java | 11 ++++-----
.../parameters/AbstractKubernetesParameters.java | 4 +++-
.../flink/kubernetes/KubernetesTestUtils.java | 15 +++++++++++++
.../decorators/FlinkConfMountDecoratorTest.java | 26 +++++++++++++++-------
.../AbstractKubernetesParametersTest.java | 16 +++++++++++++
5 files changed, 58 insertions(+), 14 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
index 79eb7aa..ef24fa3 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
@@ -144,11 +145,11 @@ public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator {
* Get properties map for the cluster-side after removal of some keys.
*/
private Map<String, String> getClusterSidePropertiesMap(Configuration flinkConfig) {
- final Map<String, String> propertiesMap = flinkConfig.toMap();
-
- // remove kubernetes.config.file
- propertiesMap.remove(KubernetesConfigOptions.KUBE_CONFIG_FILE.key());
- return propertiesMap;
+ final Configuration clusterSideConfig = flinkConfig.clone();
+ // Remove some configuration options that should not be taken to cluster side.
+ clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE);
+ clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR);
+ return clusterSideConfig.toMap();
}
@VisibleForTesting
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index c655b63..85a33d6 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -56,7 +56,9 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
@Override
public String getConfigDirectory() {
- final String configDir = flinkConfig.get(DeploymentOptionsInternal.CONF_DIR);
+ final String configDir = flinkConfig.getOptional(DeploymentOptionsInternal.CONF_DIR).orElse(
+ flinkConfig.getString(KubernetesConfigOptions.FLINK_CONF_DIR));
+
checkNotNull(configDir);
return configDir;
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
index f9dea84..6d1f8e7 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesTestUtils.java
@@ -18,8 +18,12 @@
package org.apache.flink.kubernetes;
+import org.apache.flink.configuration.Configuration;
+
import org.apache.flink.shaded.guava18.com.google.common.io.Files;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -32,4 +36,15 @@ public class KubernetesTestUtils {
public static void createTemporyFile(String data, File directory, String fileName) throws IOException {
Files.write(data, new File(directory, fileName), StandardCharsets.UTF_8);
}
+
+ public static Configuration loadConfigurationFromString(String content) {
+ final Configuration configuration = new Configuration();
+ for (String line : content.split(System.lineSeparator())) {
+ final String[] splits = line.split(":");
+ if (splits.length >= 2) {
+ configuration.setString(splits[0].trim(), StringUtils.substringAfter(line, ":").trim());
+ }
+ }
+ return configuration;
+ }
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
index da5069f..05dc938 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecoratorTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.kubernetes.kubeclient.decorators;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.KubernetesTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
@@ -42,9 +44,12 @@ import java.util.List;
import java.util.Map;
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator.getFlinkConfConfigMapName;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
/**
* General tests for the {@link FlinkConfMountDecorator}.
@@ -88,15 +93,20 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
assertEquals(Constants.API_VERSION, resultConfigMap.getApiVersion());
- assertEquals(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID),
+ assertEquals(getFlinkConfConfigMapName(CLUSTER_ID),
resultConfigMap.getMetadata().getName());
assertEquals(getCommonLabels(), resultConfigMap.getMetadata().getLabels());
Map<String, String> resultDatas = resultConfigMap.getData();
assertEquals("some data", resultDatas.get("logback.xml"));
assertEquals("some data", resultDatas.get("log4j.properties"));
- assertTrue(resultDatas.get(FLINK_CONF_FILENAME).contains(KubernetesConfigOptions.FLINK_CONF_DIR.key() +
- ": " + FLINK_CONF_DIR_IN_POD));
+
+ final Configuration resultFlinkConfig = KubernetesTestUtils.loadConfigurationFromString(
+ resultDatas.get(FLINK_CONF_FILENAME));
+ assertThat(resultFlinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR), is(FLINK_CONF_DIR_IN_POD));
+ // The following config options should not be added to config map
+ assertThat(resultFlinkConfig.get(KubernetesConfigOptions.KUBE_CONFIG_FILE), is(nullValue()));
+ assertThat(resultFlinkConfig.get(DeploymentOptionsInternal.CONF_DIR), is(nullValue()));
}
@Test
@@ -112,7 +122,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
- .withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -145,7 +155,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
- .withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -171,7 +181,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
- .withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
@@ -202,7 +212,7 @@ public class FlinkConfMountDecoratorTest extends KubernetesJobManagerTestBase {
new VolumeBuilder()
.withName(Constants.FLINK_CONF_VOLUME)
.withNewConfigMap()
- .withName(flinkConfMountDecorator.getFlinkConfConfigMapName(CLUSTER_ID))
+ .withName(getFlinkConfConfigMapName(CLUSTER_ID))
.withItems(expectedKeyToPaths)
.endConfigMap()
.build());
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
index 371b75a..3386aa2 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParametersTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.kubernetes.kubeclient.parameters;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.StringUtils;
@@ -31,6 +32,8 @@ import java.util.Map;
import java.util.Random;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
/**
* General tests for the {@link AbstractKubernetesParameters}.
@@ -62,6 +65,19 @@ public class AbstractKubernetesParametersTest extends TestLogger {
);
}
+ @Test
+ public void getConfigDirectory() {
+ final String confDir = "/path/of/flink-conf";
+ flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, confDir);
+ assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDir));
+ }
+
+ @Test
+ public void getConfigDirectoryFallbackToPodConfDir() {
+ final String confDirInPod = flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR);
+ assertThat(testingKubernetesParameters.getConfigDirectory(), is(confDirInPod));
+ }
+
private class TestingKubernetesParameters extends AbstractKubernetesParameters {
public TestingKubernetesParameters(Configuration flinkConfig) {