You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ti...@apache.org on 2020/03/25 06:36:05 UTC

[flink] branch master updated: [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f80c384  [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs
f80c384 is described below

commit f80c3847d854e0d6a62577cd1e998b57b67fc9f6
Author: Canbin Zheng <fe...@gmail.com>
AuthorDate: Wed Mar 25 14:35:48 2020 +0800

    [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs
    
    This closes #11458 .
---
 .../flink/configuration/ConfigurationUtils.java      | 15 +++++++++++++++
 .../flink/configuration/ConfigurationUtilsTest.java  | 20 +++++++++++++++++++-
 .../parameters/AbstractKubernetesParameters.java     | 10 ----------
 .../parameters/KubernetesJobManagerParameters.java   |  3 ++-
 .../runtime/clusterframework/BootstrapTools.java     | 18 ------------------
 .../runtime/clusterframework/BootstrapToolsTest.java |  5 +++--
 .../org/apache/flink/yarn/YarnClusterDescriptor.java |  3 ++-
 7 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index 745d474..9eda0e4 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -219,6 +219,21 @@ public class ConfigurationUtils {
 		return configs;
 	}
 
+	/**
+	 * Extract and parse Flink configuration properties with a given name prefix and
+	 * return the result as a Map.
+	 */
+	public static Map<String, String> getPrefixedKeyValuePairs(String prefix, Configuration configuration) {
+		Map<String, String> result  = new HashMap<>();
+		for (Map.Entry<String, String> entry: configuration.toMap().entrySet()) {
+			if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+				String key = entry.getKey().substring(prefix.length());
+				result.put(key, entry.getValue());
+			}
+		}
+		return result;
+	}
+
 	// Make sure that we cannot instantiate this class
 	private ConfigurationUtils() {
 	}
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
index fa9345a..be5d340 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java
@@ -55,7 +55,7 @@ public class ConfigurationUtilsTest extends TestLogger {
 	}
 
 	@Test
-		public void testHideSensitiveValues() {
+	public void testHideSensitiveValues() {
 		final Map<String, String> keyValuePairs = new HashMap<>();
 		keyValuePairs.put("foobar", "barfoo");
 		final String secretKey1 = "secret.key";
@@ -74,4 +74,22 @@ public class ConfigurationUtilsTest extends TestLogger {
 		assertThat(hiddenSensitiveValues, is(equalTo(expectedKeyValuePairs)));
 	}
 
+	@Test
+	public void testGetPrefixedKeyValuePairs() {
+		final String prefix = "test.prefix.";
+		final Map<String, String> expectedKeyValuePairs = new HashMap<String, String>() {
+			{
+				put("k1", "v1");
+				put("k2", "v2");
+			}
+		};
+
+		final Configuration configuration =  new Configuration();
+		expectedKeyValuePairs.forEach((k, v) -> configuration.setString(prefix + k, v));
+
+		final Map<String, String> resultKeyValuePairs = ConfigurationUtils.getPrefixedKeyValuePairs(prefix, configuration);
+
+		assertThat(resultKeyValuePairs, is(equalTo(expectedKeyValuePairs)));
+	}
+
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
index 8e7c443..4e0916a 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.utils.Constants;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 
 import io.fabric8.kubernetes.api.model.LocalObjectReference;
 import org.apache.commons.lang3.StringUtils;
@@ -163,13 +162,4 @@ public abstract class AbstractKubernetesParameters implements KubernetesParamete
 
 		return Optional.empty();
 	}
-
-	/**
-	 * Extract container customized environment variable properties with a given name prefix.
-	 * @param envPrefix the given property name prefix
-	 * @return a Map storing with customized environment variable key/value pairs.
-	 */
-	protected Map<String, String> getPrefixedEnvironments(String envPrefix) {
-		return BootstrapTools.getEnvironmentVariables(envPrefix, flinkConfig);
-	}
 }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 3f14601..cff893b 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -21,6 +21,7 @@ package org.apache.flink.kubernetes.kubeclient.parameters;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.RestOptions;
@@ -63,7 +64,7 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
 
 	@Override
 	public Map<String, String> getEnvironments() {
-		return getPrefixedEnvironments(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
+		return ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, flinkConfig);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 494d150..cc294b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -713,22 +713,4 @@ public class BootstrapTools {
 		}
 		return memory - heapLimit;
 	}
-
-	/**
-	 * Method to extract environment variables from the flinkConfiguration based on the given prefix String.
-	 *
-	 * @param envPrefix Prefix for the environment variables key
-	 * @param flinkConfiguration The Flink config to get the environment variable definition from
-	 */
-	public static Map<String, String> getEnvironmentVariables(String envPrefix, Configuration flinkConfiguration) {
-		Map<String, String> result  = new HashMap<>();
-		for (Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
-			if (entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
-				// remove prefix
-				String key = entry.getKey().substring(envPrefix.length());
-				result.put(key, entry.getValue());
-			}
-		}
-		return result;
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index cee17ca..0a9ccf2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.MemorySize;
@@ -531,7 +532,7 @@ public class BootstrapToolsTest extends TestLogger {
 		Configuration testConf = new Configuration();
 		testConf.setString("containerized.master.env.LD_LIBRARY_PATH", "/usr/lib/native");
 
-		Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
+		Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);
 
 		Assert.assertEquals(1, res.size());
 		Map.Entry<String, String> entry = res.entrySet().iterator().next();
@@ -544,7 +545,7 @@ public class BootstrapToolsTest extends TestLogger {
 		Configuration testConf = new Configuration();
 		testConf.setString("containerized.master.env.", "/usr/lib/native");
 
-		Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
+		Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);
 
 		Assert.assertEquals(0, res.size());
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 4d6f691..cde732b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -947,7 +948,7 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> {
 		final Map<String, String> appMasterEnv = new HashMap<>();
 		// set user specified app master environment variables
 		appMasterEnv.putAll(
-			BootstrapTools.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
+			ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
 		// set Flink app class path
 		appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());