You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2020/03/25 06:36:05 UTC
[flink] branch master updated: [FLINK-16625][utils] Extract
BootstrapTools#getEnvironmentVariables to
ConfigurationUtils#getPrefixedKeyValuePairs
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f80c384 [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs
f80c384 is described below
commit f80c3847d854e0d6a62577cd1e998b57b67fc9f6
Author: Canbin Zheng <fe...@gmail.com>
AuthorDate: Wed Mar 25 14:35:48 2020 +0800
[FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs
This closes #11458 .
---
.../flink/configuration/ConfigurationUtils.java | 15 +++++++++++++++
.../flink/configuration/ConfigurationUtilsTest.java | 20 +++++++++++++++++++-
.../parameters/AbstractKubernetesParameters.java | 10 ----------
.../parameters/KubernetesJobManagerParameters.java | 3 ++-
.../runtime/clusterframework/BootstrapTools.java | 18 ------------------
.../runtime/clusterframework/BootstrapToolsTest.java | 5 +++--
.../org/apache/flink/yarn/YarnClusterDescriptor.java | 3 ++-
7 files changed, 41 insertions(+), 33 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 745d474..9eda0e4 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -219,6 +219,21 @@ public class ConfigurationUtils {
return configs;
}
+ /**
+ * Extract and parse Flink configuration properties with a given name prefix and
+ * return the result as a Map.
+ */
+ public static Map<String, String> getPrefixedKeyValuePairs(String prefix, Configuration configuration) {
+ Map<String, String> result = new HashMap<>();
+ for (Map.Entry<String, String> entry: configuration.toMap().entrySet()) {
+ if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+ String key = entry.getKey().substring(prefix.length());
+ result.put(key, entry.getValue());
+ }
+ }
+ return result;
+ }
+
// Make sure that we cannot instantiate this class
private ConfigurationUtils() {
}
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
index fa9345a..be5d340 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
@@ -55,7 +55,7 @@ public class ConfigurationUtilsTest extends TestLogger {
}
@Test
- public void testHideSensitiveValues() {
+ public void testHideSensitiveValues() {
final Map<String, String> keyValuePairs = new HashMap<>();
keyValuePairs.put("foobar", "barfoo");
final String secretKey1 = "secret.key";
@@ -74,4 +74,22 @@ public class ConfigurationUtilsTest extends TestLogger {
assertThat(hiddenSensitiveValues, is(equalTo(expectedKeyValuePairs)));
}
+ @Test
+ public void testGetPrefixedKeyValuePairs() {
+ final String prefix = "test.prefix.";
+ final Map<String, String> expectedKeyValuePairs = new HashMap<String, String>() {
+ {
+ put("k1", "v1");
+ put("k2", "v2");
+ }
+ };
+
+ final Configuration configuration = new Configuration();
+ expectedKeyValuePairs.forEach((k, v) -> configuration.setString(prefix + k, v));
+
+ final Map<String, String> resultKeyValuePairs = ConfigurationUtils.getPrefixedKeyValuePairs(prefix, configuration);
+
+ assertThat(resultKeyValuePairs, is(equalTo(expectedKeyValuePairs)));
+ }
+
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index 8e7c443..4e0916a 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import org.apache.commons.lang3.StringUtils;
@@ -163,13 +162,4 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
return Optional.empty();
}
-
- /**
- * Extract container customized environment variable properties with a given name prefix.
- * @param envPrefix the given property name prefix
- * @return a Map storing with customized environment variable key/value pairs.
- */
- protected Map<String, String> getPrefixedEnvironments(String envPrefix) {
- return BootstrapTools.getEnvironmentVariables(envPrefix, flinkConfig);
- }
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 3f14601..cff893b 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.kubeclient.parameters;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
@@ -63,7 +64,7 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
@Override
public Map<String, String> getEnvironments() {
- return getPrefixedEnvironments(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
+ return ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, flinkConfig);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 494d150..cc294b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -713,22 +713,4 @@ public class BootstrapTools {
}
return memory - heapLimit;
}
-
- /**
- * Method to extract environment variables from the flinkConfiguration based on the given prefix String.
- *
- * @param envPrefix Prefix for the environment variables key
- * @param flinkConfiguration The Flink config to get the environment variable definition from
- */
- public static Map<String, String> getEnvironmentVariables(String envPrefix, Configuration flinkConfiguration) {
- Map<String, String> result = new HashMap<>();
- for (Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
- if (entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
- // remove prefix
- String key = entry.getKey().substring(envPrefix.length());
- result.put(key, entry.getValue());
- }
- }
- return result;
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cee17ca..0a9ccf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
@@ -531,7 +532,7 @@ public class BootstrapToolsTest extends TestLogger {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.LD_LIBRARY_PATH", "/usr/lib/native");
- Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
+ Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);
Assert.assertEquals(1, res.size());
Map.Entry<String, String> entry = res.entrySet().iterator().next();
@@ -544,7 +545,7 @@ public class BootstrapToolsTest extends TestLogger {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.", "/usr/lib/native");
- Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
+ Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);
Assert.assertEquals(0, res.size());
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 4d6f691..cde732b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -947,7 +948,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(
- BootstrapTools.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
+ ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());