You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/09 10:33:43 UTC

[flink] branch master updated: [FLINK-10436] Add ConfigOption#withFallbackKeys (#6872)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e2aa8e  [FLINK-10436] Add ConfigOption#withFallbackKeys (#6872)
1e2aa8e is described below

commit 1e2aa8e9f35e7a943a4ed56a47834ee50bab3b47
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Wed Jan 9 18:33:36 2019 +0800

    [FLINK-10436] Add ConfigOption#withFallbackKeys (#6872)
    
    * [FLINK-10436] Add ConfigOption#withFallbackKeys
    
    * [hotfix] correct javadoc
    
    * [FLINK-10436] Code quality improvement
---
 .../apache/flink/configuration/ConfigOption.java   | 89 +++++++++++++---------
 .../apache/flink/configuration/Configuration.java  | 37 +++++----
 .../configuration/DelegatingConfiguration.java     | 12 +--
 .../apache/flink/configuration/FallbackKey.java    | 80 +++++++++++++++++++
 .../apache/flink/configuration/RestOptions.java    |  2 +-
 .../flink/configuration/ConfigurationTest.java     | 33 ++++++++
 6 files changed, 198 insertions(+), 55 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
index 9e6fc32..d5fb555 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public class ConfigOption<T> {
 
-	private static final String[] EMPTY = new String[0];
+	private static final FallbackKey[] EMPTY = new FallbackKey[0];
 
 	// ------------------------------------------------------------------------
 
@@ -47,7 +47,7 @@ public class ConfigOption<T> {
 	private final String key;
 
 	/** The list of deprecated keys, in the order to be checked. */
-	private final String[] deprecatedKeys;
+	private final FallbackKey[] fallbackKeys;
 
 	/** The default value for this option. */
 	private final T defaultValue;
@@ -58,54 +58,72 @@ public class ConfigOption<T> {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new config option with no deprecated keys.
+	 * Creates a new config option with no fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param defaultValue    The default value for this option
+	 * @param key The current key for that config option
+	 * @param defaultValue The default value for this option
 	 */
 	ConfigOption(String key, T defaultValue) {
 		this.key = checkNotNull(key);
 		this.description = Description.builder().text("").build();
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = EMPTY;
+		this.fallbackKeys = EMPTY;
 	}
 
 	/**
-	 * Creates a new config option with deprecated keys.
+	 * Creates a new config option with fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param description     Description for that option
-	 * @param defaultValue    The default value for this option
-	 * @param deprecatedKeys  The list of deprecated keys, in the order to be checked
+	 * @param key The current key for that config option
+	 * @param description Description for that option
+	 * @param defaultValue The default value for this option
+	 * @param fallbackKeys The list of fallback keys, in the order to be checked
 	 * @deprecated use version with {@link Description} instead
 	 */
 	@Deprecated
-	ConfigOption(String key, String description, T defaultValue, String... deprecatedKeys) {
+	ConfigOption(String key, String description, T defaultValue, FallbackKey... fallbackKeys) {
 		this.key = checkNotNull(key);
 		this.description = Description.builder().text(description).build();
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+		this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
 	}
 
 	/**
-	 * Creates a new config option with deprecated keys.
+	 * Creates a new config option with fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param description     Description for that option
-	 * @param defaultValue    The default value for this option
-	 * @param deprecatedKeys  The list of deprecated keys, in the order to be checked
+	 * @param key The current key for that config option
+	 * @param description Description for that option
+	 * @param defaultValue The default value for this option
+	 * @param fallbackKeys The list of fallback keys, in the order to be checked
 	 */
-	ConfigOption(String key, Description description, T defaultValue, String... deprecatedKeys) {
+	ConfigOption(String key, Description description, T defaultValue, FallbackKey... fallbackKeys) {
 		this.key = checkNotNull(key);
 		this.description = description;
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+		this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
 	}
 
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new config option, using this option's key and default value, and
+	 * adding the given fallback keys.
+	 *
+	 * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
+	 * the fallback keys will be checked in the order provided to this method. The first key for which
+	 * a value is found will be used - that value will be returned.
+	 *
+	 * @param fallbackKeys The fallback keys, in the order in which they should be checked.
+	 * @return A new config options, with the given fallback keys.
+	 */
+	public ConfigOption<T> withFallbackKeys(String... fallbackKeys) {
+		FallbackKey[] fallbackKeyArray = Arrays.stream(fallbackKeys)
+			.map(FallbackKey::createFallbackKey)
+			.toArray(FallbackKey[]::new);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeyArray);
+	}
+
+	/**
+	 * Creates a new config option, using this option's key and default value, and
 	 * adding the given deprecated keys.
 	 *
 	 * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
@@ -116,7 +134,10 @@ public class ConfigOption<T> {
 	 * @return A new config options, with the given deprecated keys.
 	 */
 	public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
-		return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+		FallbackKey[] fallbackKeys = Arrays.stream(deprecatedKeys)
+			.map(FallbackKey::createDeprecatedKey)
+			.toArray(FallbackKey[]::new);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
 	}
 
 	/**
@@ -138,7 +159,7 @@ public class ConfigOption<T> {
 	 * @return A new config option, with given description.
 	 */
 	public ConfigOption<T> withDescription(final Description description) {
-		return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
 	}
 
 	// ------------------------------------------------------------------------
@@ -168,19 +189,19 @@ public class ConfigOption<T> {
 	}
 
 	/**
-	 * Checks whether this option has deprecated keys.
-	 * @return True if the option has deprecated keys, false if not.
+	 * Checks whether this option has fallback keys.
+	 * @return True if the option has fallback keys, false if not.
 	 */
-	public boolean hasDeprecatedKeys() {
-		return deprecatedKeys != EMPTY;
+	public boolean hasFallbackKeys() {
+		return fallbackKeys != EMPTY;
 	}
 
 	/**
-	 * Gets the deprecated keys, in the order to be checked.
-	 * @return The option's deprecated keys.
+	 * Gets the fallback keys, in the order to be checked.
+	 * @return The option's fallback keys.
 	 */
-	public Iterable<String> deprecatedKeys() {
-		return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+	public Iterable<FallbackKey> fallbackKeys() {
+		return (fallbackKeys == EMPTY) ? Collections.emptyList() : Arrays.asList(fallbackKeys);
 	}
 
 	/**
@@ -201,7 +222,7 @@ public class ConfigOption<T> {
 		else if (o != null && o.getClass() == ConfigOption.class) {
 			ConfigOption<?> that = (ConfigOption<?>) o;
 			return this.key.equals(that.key) &&
-					Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
+					Arrays.equals(this.fallbackKeys, that.fallbackKeys) &&
 					(this.defaultValue == null ? that.defaultValue == null :
 							(that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
 		}
@@ -213,13 +234,13 @@ public class ConfigOption<T> {
 	@Override
 	public int hashCode() {
 		return 31 * key.hashCode() +
-				17 * Arrays.hashCode(deprecatedKeys) +
+				17 * Arrays.hashCode(fallbackKeys) +
 				(defaultValue != null ? defaultValue.hashCode() : 0);
 	}
 
 	@Override
 	public String toString() {
-		return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
-				key, defaultValue, Arrays.toString(deprecatedKeys));
+		return String.format("Key: '%s' , default: %s (fallback keys: %s)",
+				key, defaultValue, Arrays.toString(fallbackKeys));
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 00c4c38..07c0290 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -701,12 +701,11 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			if (this.confData.containsKey(configOption.key())) {
 				return true;
 			}
-			else if (configOption.hasDeprecatedKeys()) {
-				// try the deprecated keys
-				for (String deprecatedKey : configOption.deprecatedKeys()) {
-					if (this.confData.containsKey(deprecatedKey)) {
-						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+			else if (configOption.hasFallbackKeys()) {
+				// try the fallback keys
+				for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+					if (this.confData.containsKey(fallbackKey.getKey())) {
+						loggingFallback(fallbackKey, configOption);
 						return true;
 					}
 				}
@@ -741,11 +740,10 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			// try the current key
 			Object oldValue = this.confData.remove(configOption.key());
 			if (oldValue == null){
-				for (String deprecatedKey : configOption.deprecatedKeys()){
-					oldValue = this.confData.remove(deprecatedKey);
+				for (FallbackKey fallbackKey : configOption.fallbackKeys()){
+					oldValue = this.confData.remove(fallbackKey.getKey());
 					if (oldValue != null){
-						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+						loggingFallback(fallbackKey, configOption);
 						return true;
 					}
 				}
@@ -789,13 +787,12 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			// found a value for the current proper key
 			return o;
 		}
-		else if (configOption.hasDeprecatedKeys()) {
+		else if (configOption.hasFallbackKeys()) {
 			// try the deprecated keys
-			for (String deprecatedKey : configOption.deprecatedKeys()) {
-				Object oo = getRawValue(deprecatedKey);
+			for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+				Object oo = getRawValue(fallbackKey.getKey());
 				if (oo != null) {
-					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+					loggingFallback(fallbackKey, configOption);
 					return oo;
 				}
 			}
@@ -809,6 +806,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		return o != null ? o : configOption.defaultValue();
 	}
 
+	private void loggingFallback(FallbackKey fallbackKey, ConfigOption<?> configOption) {
+		if (fallbackKey.isDeprecated()) {
+			LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+				fallbackKey.getKey(), configOption.key());
+		} else {
+			LOG.info("Config uses fallback configuration key '{}' instead of key '{}'",
+				fallbackKey.getKey(), configOption.key());
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Type conversion
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 1a637f6..0a0a777 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey;
+
 /**
  * A configuration that manages a subset of keys with a common prefix from a given configuration.
  */
@@ -361,17 +363,17 @@ public final class DelegatingConfiguration extends Configuration {
 	private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
 		String key = prefix + option.key();
 
-		List<String> deprecatedKeys;
-		if (option.hasDeprecatedKeys()) {
+		List<FallbackKey> deprecatedKeys;
+		if (option.hasFallbackKeys()) {
 			deprecatedKeys = new ArrayList<>();
-			for (String dk : option.deprecatedKeys()) {
-				deprecatedKeys.add(prefix + dk);
+			for (FallbackKey dk : option.fallbackKeys()) {
+				deprecatedKeys.add(createDeprecatedKey(prefix + dk.getKey()));
 			}
 		} else {
 			deprecatedKeys = Collections.emptyList();
 		}
 
-		String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
+		FallbackKey[] deprecated = deprecatedKeys.toArray(new FallbackKey[0]);
 		return new ConfigOption<>(key,
 			option.description(),
 			option.defaultValue(),
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
new file mode 100644
index 0000000..6c67727
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured.
+ */
+public class FallbackKey {
+
+	// -------------------------
+	//  Factory methods
+	// -------------------------
+
+	static FallbackKey createFallbackKey(String key) {
+		return new FallbackKey(key, false);
+	}
+
+	static FallbackKey createDeprecatedKey(String key) {
+		return new FallbackKey(key, true);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private final String key;
+
+	private final boolean isDeprecated;
+
+	public String getKey() {
+		return key;
+	}
+
+	public boolean isDeprecated() {
+		return isDeprecated;
+	}
+
+	private FallbackKey(String key, boolean isDeprecated) {
+		this.key = key;
+		this.isDeprecated = isDeprecated;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o != null && o.getClass() == FallbackKey.class) {
+			FallbackKey that = (FallbackKey) o;
+			return this.key.equals(that.key) && (this.isDeprecated == that.isDeprecated);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * key.hashCode() + (isDeprecated ? 1 : 0);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("{key=%s, isDeprecated=%s}", key, isDeprecated);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 5c1b6d6..d4f5a74 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -43,7 +43,7 @@ public class RestOptions {
 	public static final ConfigOption<String> ADDRESS =
 		key("rest.address")
 			.noDefaultValue()
-			.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+			.withFallbackKeys(JobManagerOptions.ADDRESS.key())
 			.withDescription("The address that should be used by clients to connect to the server.");
 
 	/**
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 3b98a44..d0d382f 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -306,6 +306,39 @@ public class ConfigurationTest extends TestLogger {
 	}
 
 	@Test
+	public void testFallbackKeys() {
+		Configuration cfg = new Configuration();
+		cfg.setInteger("the-key", 11);
+		cfg.setInteger("old-key", 12);
+		cfg.setInteger("older-key", 13);
+
+		ConfigOption<Integer> matchesFirst = ConfigOptions
+			.key("the-key")
+			.defaultValue(-1)
+			.withFallbackKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesSecond = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesThird = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("foo", "older-key");
+
+		ConfigOption<Integer> notContained = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("not-there", "also-not-there");
+
+		assertEquals(11, cfg.getInteger(matchesFirst));
+		assertEquals(12, cfg.getInteger(matchesSecond));
+		assertEquals(13, cfg.getInteger(matchesThird));
+		assertEquals(-1, cfg.getInteger(notContained));
+	}
+
+	@Test
 	public void testRemove(){
 		Configuration cfg = new Configuration();
 		cfg.setInteger("a", 1);