You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/09 10:33:38 UTC

[GitHub] tillrohrmann closed pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

tillrohrmann closed pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
index 9e6fc324670..d5fb5551767 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -39,7 +39,7 @@
 @PublicEvolving
 public class ConfigOption<T> {
 
-	private static final String[] EMPTY = new String[0];
+	private static final FallbackKey[] EMPTY = new FallbackKey[0];
 
 	// ------------------------------------------------------------------------
 
@@ -47,7 +47,7 @@
 	private final String key;
 
 	/** The list of deprecated keys, in the order to be checked. */
-	private final String[] deprecatedKeys;
+	private final FallbackKey[] fallbackKeys;
 
 	/** The default value for this option. */
 	private final T defaultValue;
@@ -58,52 +58,70 @@
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new config option with no deprecated keys.
+	 * Creates a new config option with no fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param defaultValue    The default value for this option
+	 * @param key The current key for that config option
+	 * @param defaultValue The default value for this option
 	 */
 	ConfigOption(String key, T defaultValue) {
 		this.key = checkNotNull(key);
 		this.description = Description.builder().text("").build();
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = EMPTY;
+		this.fallbackKeys = EMPTY;
 	}
 
 	/**
-	 * Creates a new config option with deprecated keys.
+	 * Creates a new config option with fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param description     Description for that option
-	 * @param defaultValue    The default value for this option
-	 * @param deprecatedKeys  The list of deprecated keys, in the order to be checked
+	 * @param key The current key for that config option
+	 * @param description Description for that option
+	 * @param defaultValue The default value for this option
+	 * @param fallbackKeys The list of fallback keys, in the order to be checked
 	 * @deprecated use version with {@link Description} instead
 	 */
 	@Deprecated
-	ConfigOption(String key, String description, T defaultValue, String... deprecatedKeys) {
+	ConfigOption(String key, String description, T defaultValue, FallbackKey... fallbackKeys) {
 		this.key = checkNotNull(key);
 		this.description = Description.builder().text(description).build();
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+		this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
 	}
 
 	/**
-	 * Creates a new config option with deprecated keys.
+	 * Creates a new config option with fallback keys.
 	 *
-	 * @param key             The current key for that config option
-	 * @param description     Description for that option
-	 * @param defaultValue    The default value for this option
-	 * @param deprecatedKeys  The list of deprecated keys, in the order to be checked
+	 * @param key The current key for that config option
+	 * @param description Description for that option
+	 * @param defaultValue The default value for this option
+	 * @param fallbackKeys The list of fallback keys, in the order to be checked
 	 */
-	ConfigOption(String key, Description description, T defaultValue, String... deprecatedKeys) {
+	ConfigOption(String key, Description description, T defaultValue, FallbackKey... fallbackKeys) {
 		this.key = checkNotNull(key);
 		this.description = description;
 		this.defaultValue = defaultValue;
-		this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+		this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
 	}
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Creates a new config option, using this option's key and default value, and
+	 * adding the given fallback keys.
+	 *
+	 * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
+	 * the fallback keys will be checked in the order provided to this method. The first key for which
+	 * a value is found will be used - that value will be returned.
+	 *
+	 * @param fallbackKeys The fallback keys, in the order in which they should be checked.
+	 * @return A new config options, with the given fallback keys.
+	 */
+	public ConfigOption<T> withFallbackKeys(String... fallbackKeys) {
+		FallbackKey[] fallbackKeyArray = Arrays.stream(fallbackKeys)
+			.map(FallbackKey::createFallbackKey)
+			.toArray(FallbackKey[]::new);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeyArray);
+	}
+
 	/**
 	 * Creates a new config option, using this option's key and default value, and
 	 * adding the given deprecated keys.
@@ -116,7 +134,10 @@
 	 * @return A new config options, with the given deprecated keys.
 	 */
 	public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
-		return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+		FallbackKey[] fallbackKeys = Arrays.stream(deprecatedKeys)
+			.map(FallbackKey::createDeprecatedKey)
+			.toArray(FallbackKey[]::new);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
 	}
 
 	/**
@@ -138,7 +159,7 @@
 	 * @return A new config option, with given description.
 	 */
 	public ConfigOption<T> withDescription(final Description description) {
-		return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+		return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
 	}
 
 	// ------------------------------------------------------------------------
@@ -168,19 +189,19 @@ public T defaultValue() {
 	}
 
 	/**
-	 * Checks whether this option has deprecated keys.
-	 * @return True if the option has deprecated keys, false if not.
+	 * Checks whether this option has fallback keys.
+	 * @return True if the option has fallback keys, false if not.
 	 */
-	public boolean hasDeprecatedKeys() {
-		return deprecatedKeys != EMPTY;
+	public boolean hasFallbackKeys() {
+		return fallbackKeys != EMPTY;
 	}
 
 	/**
-	 * Gets the deprecated keys, in the order to be checked.
-	 * @return The option's deprecated keys.
+	 * Gets the fallback keys, in the order to be checked.
+	 * @return The option's fallback keys.
 	 */
-	public Iterable<String> deprecatedKeys() {
-		return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+	public Iterable<FallbackKey> fallbackKeys() {
+		return (fallbackKeys == EMPTY) ? Collections.emptyList() : Arrays.asList(fallbackKeys);
 	}
 
 	/**
@@ -201,7 +222,7 @@ public boolean equals(Object o) {
 		else if (o != null && o.getClass() == ConfigOption.class) {
 			ConfigOption<?> that = (ConfigOption<?>) o;
 			return this.key.equals(that.key) &&
-					Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
+					Arrays.equals(this.fallbackKeys, that.fallbackKeys) &&
 					(this.defaultValue == null ? that.defaultValue == null :
 							(that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
 		}
@@ -213,13 +234,13 @@ else if (o != null && o.getClass() == ConfigOption.class) {
 	@Override
 	public int hashCode() {
 		return 31 * key.hashCode() +
-				17 * Arrays.hashCode(deprecatedKeys) +
+				17 * Arrays.hashCode(fallbackKeys) +
 				(defaultValue != null ? defaultValue.hashCode() : 0);
 	}
 
 	@Override
 	public String toString() {
-		return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
-				key, defaultValue, Arrays.toString(deprecatedKeys));
+		return String.format("Key: '%s' , default: %s (fallback keys: %s)",
+				key, defaultValue, Arrays.toString(fallbackKeys));
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 00c4c38e820..07c0290d66c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -701,12 +701,11 @@ public boolean contains(ConfigOption<?> configOption) {
 			if (this.confData.containsKey(configOption.key())) {
 				return true;
 			}
-			else if (configOption.hasDeprecatedKeys()) {
-				// try the deprecated keys
-				for (String deprecatedKey : configOption.deprecatedKeys()) {
-					if (this.confData.containsKey(deprecatedKey)) {
-						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+			else if (configOption.hasFallbackKeys()) {
+				// try the fallback keys
+				for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+					if (this.confData.containsKey(fallbackKey.getKey())) {
+						loggingFallback(fallbackKey, configOption);
 						return true;
 					}
 				}
@@ -741,11 +740,10 @@ else if (configOption.hasDeprecatedKeys()) {
 			// try the current key
 			Object oldValue = this.confData.remove(configOption.key());
 			if (oldValue == null){
-				for (String deprecatedKey : configOption.deprecatedKeys()){
-					oldValue = this.confData.remove(deprecatedKey);
+				for (FallbackKey fallbackKey : configOption.fallbackKeys()){
+					oldValue = this.confData.remove(fallbackKey.getKey());
 					if (oldValue != null){
-						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+						loggingFallback(fallbackKey, configOption);
 						return true;
 					}
 				}
@@ -789,13 +787,12 @@ private Object getRawValueFromOption(ConfigOption<?> configOption) {
 			// found a value for the current proper key
 			return o;
 		}
-		else if (configOption.hasDeprecatedKeys()) {
+		else if (configOption.hasFallbackKeys()) {
 			// try the deprecated keys
-			for (String deprecatedKey : configOption.deprecatedKeys()) {
-				Object oo = getRawValue(deprecatedKey);
+			for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+				Object oo = getRawValue(fallbackKey.getKey());
 				if (oo != null) {
-					LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
-							deprecatedKey, configOption.key());
+					loggingFallback(fallbackKey, configOption);
 					return oo;
 				}
 			}
@@ -809,6 +806,16 @@ private Object getValueOrDefaultFromOption(ConfigOption<?> configOption) {
 		return o != null ? o : configOption.defaultValue();
 	}
 
+	private void loggingFallback(FallbackKey fallbackKey, ConfigOption<?> configOption) {
+		if (fallbackKey.isDeprecated()) {
+			LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+				fallbackKey.getKey(), configOption.key());
+		} else {
+			LOG.info("Config uses fallback configuration key '{}' instead of key '{}'",
+				fallbackKey.getKey(), configOption.key());
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Type conversion
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 1a637f65626..0a0a7773e2b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -32,6 +32,8 @@
 import java.util.Properties;
 import java.util.Set;
 
+import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey;
+
 /**
  * A configuration that manages a subset of keys with a common prefix from a given configuration.
  */
@@ -361,17 +363,17 @@ public boolean equals(Object obj) {
 	private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
 		String key = prefix + option.key();
 
-		List<String> deprecatedKeys;
-		if (option.hasDeprecatedKeys()) {
+		List<FallbackKey> deprecatedKeys;
+		if (option.hasFallbackKeys()) {
 			deprecatedKeys = new ArrayList<>();
-			for (String dk : option.deprecatedKeys()) {
-				deprecatedKeys.add(prefix + dk);
+			for (FallbackKey dk : option.fallbackKeys()) {
+				deprecatedKeys.add(createDeprecatedKey(prefix + dk.getKey()));
 			}
 		} else {
 			deprecatedKeys = Collections.emptyList();
 		}
 
-		String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
+		FallbackKey[] deprecated = deprecatedKeys.toArray(new FallbackKey[0]);
 		return new ConfigOption<>(key,
 			option.description(),
 			option.defaultValue(),
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
new file mode 100644
index 00000000000..6c67727ce94
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured.
+ */
+public class FallbackKey {
+
+	// -------------------------
+	//  Factory methods
+	// -------------------------
+
+	static FallbackKey createFallbackKey(String key) {
+		return new FallbackKey(key, false);
+	}
+
+	static FallbackKey createDeprecatedKey(String key) {
+		return new FallbackKey(key, true);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private final String key;
+
+	private final boolean isDeprecated;
+
+	public String getKey() {
+		return key;
+	}
+
+	public boolean isDeprecated() {
+		return isDeprecated;
+	}
+
+	private FallbackKey(String key, boolean isDeprecated) {
+		this.key = key;
+		this.isDeprecated = isDeprecated;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o != null && o.getClass() == FallbackKey.class) {
+			FallbackKey that = (FallbackKey) o;
+			return this.key.equals(that.key) && (this.isDeprecated == that.isDeprecated);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * key.hashCode() + (isDeprecated ? 1 : 0);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("{key=%s, isDeprecated=%s}", key, isDeprecated);
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index edfd39be808..2e60747b2db 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -43,7 +43,7 @@
 	public static final ConfigOption<String> ADDRESS =
 		key("rest.address")
 			.noDefaultValue()
-			.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+			.withFallbackKeys(JobManagerOptions.ADDRESS.key())
 			.withDescription("The address that should be used by clients to connect to the server.");
 
 	/**
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 3b98a449089..d0d382fe6e4 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -305,6 +305,39 @@ public void testDeprecatedKeys() {
 		assertEquals(-1, cfg.getInteger(notContained));
 	}
 
+	@Test
+	public void testFallbackKeys() {
+		Configuration cfg = new Configuration();
+		cfg.setInteger("the-key", 11);
+		cfg.setInteger("old-key", 12);
+		cfg.setInteger("older-key", 13);
+
+		ConfigOption<Integer> matchesFirst = ConfigOptions
+			.key("the-key")
+			.defaultValue(-1)
+			.withFallbackKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesSecond = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesThird = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("foo", "older-key");
+
+		ConfigOption<Integer> notContained = ConfigOptions
+			.key("does-not-exist")
+			.defaultValue(-1)
+			.withFallbackKeys("not-there", "also-not-there");
+
+		assertEquals(11, cfg.getInteger(matchesFirst));
+		assertEquals(12, cfg.getInteger(matchesSecond));
+		assertEquals(13, cfg.getInteger(matchesThird));
+		assertEquals(-1, cfg.getInteger(notContained));
+	}
+
 	@Test
 	public void testRemove(){
 		Configuration cfg = new Configuration();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services