You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/09 10:33:43 UTC
[flink] branch master updated: [FLINK-10436] Add
ConfigOption#withFallbackKeys (#6872)
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1e2aa8e [FLINK-10436] Add ConfigOption#withFallbackKeys (#6872)
1e2aa8e is described below
commit 1e2aa8e9f35e7a943a4ed56a47834ee50bab3b47
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Wed Jan 9 18:33:36 2019 +0800
[FLINK-10436] Add ConfigOption#withFallbackKeys (#6872)
* [FLINK-10436] Add ConfigOption#withFallbackKeys
* [hotfix] correct javadoc
* [FLINK-10436] Code quality improvement
---
.../apache/flink/configuration/ConfigOption.java | 89 +++++++++++++---------
.../apache/flink/configuration/Configuration.java | 37 +++++----
.../configuration/DelegatingConfiguration.java | 12 +--
.../apache/flink/configuration/FallbackKey.java | 80 +++++++++++++++++++
.../apache/flink/configuration/RestOptions.java | 2 +-
.../flink/configuration/ConfigurationTest.java | 33 ++++++++
6 files changed, 198 insertions(+), 55 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
index 9e6fc32..d5fb555 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -39,7 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@PublicEvolving
public class ConfigOption<T> {
- private static final String[] EMPTY = new String[0];
+ private static final FallbackKey[] EMPTY = new FallbackKey[0];
// ------------------------------------------------------------------------
@@ -47,7 +47,7 @@ public class ConfigOption<T> {
private final String key;
/** The list of deprecated keys, in the order to be checked. */
- private final String[] deprecatedKeys;
+ private final FallbackKey[] fallbackKeys;
/** The default value for this option. */
private final T defaultValue;
@@ -58,54 +58,72 @@ public class ConfigOption<T> {
// ------------------------------------------------------------------------
/**
- * Creates a new config option with no deprecated keys.
+ * Creates a new config option with no fallback keys.
*
- * @param key The current key for that config option
- * @param defaultValue The default value for this option
+ * @param key The current key for that config option
+ * @param defaultValue The default value for this option
*/
ConfigOption(String key, T defaultValue) {
this.key = checkNotNull(key);
this.description = Description.builder().text("").build();
this.defaultValue = defaultValue;
- this.deprecatedKeys = EMPTY;
+ this.fallbackKeys = EMPTY;
}
/**
- * Creates a new config option with deprecated keys.
+ * Creates a new config option with fallback keys.
*
- * @param key The current key for that config option
- * @param description Description for that option
- * @param defaultValue The default value for this option
- * @param deprecatedKeys The list of deprecated keys, in the order to be checked
+ * @param key The current key for that config option
+ * @param description Description for that option
+ * @param defaultValue The default value for this option
+ * @param fallbackKeys The list of fallback keys, in the order to be checked
* @deprecated use version with {@link Description} instead
*/
@Deprecated
- ConfigOption(String key, String description, T defaultValue, String... deprecatedKeys) {
+ ConfigOption(String key, String description, T defaultValue, FallbackKey... fallbackKeys) {
this.key = checkNotNull(key);
this.description = Description.builder().text(description).build();
this.defaultValue = defaultValue;
- this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+ this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
}
/**
- * Creates a new config option with deprecated keys.
+ * Creates a new config option with fallback keys.
*
- * @param key The current key for that config option
- * @param description Description for that option
- * @param defaultValue The default value for this option
- * @param deprecatedKeys The list of deprecated keys, in the order to be checked
+ * @param key The current key for that config option
+ * @param description Description for that option
+ * @param defaultValue The default value for this option
+ * @param fallbackKeys The list of fallback keys, in the order to be checked
*/
- ConfigOption(String key, Description description, T defaultValue, String... deprecatedKeys) {
+ ConfigOption(String key, Description description, T defaultValue, FallbackKey... fallbackKeys) {
this.key = checkNotNull(key);
this.description = description;
this.defaultValue = defaultValue;
- this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+ this.fallbackKeys = fallbackKeys == null || fallbackKeys.length == 0 ? EMPTY : fallbackKeys;
}
// ------------------------------------------------------------------------
/**
* Creates a new config option, using this option's key and default value, and
+ * adding the given fallback keys.
+ *
+ * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
+ * the fallback keys will be checked in the order provided to this method. The first key for which
+ * a value is found will be used - that value will be returned.
+ *
+ * @param fallbackKeys The fallback keys, in the order in which they should be checked.
+ * @return A new config options, with the given fallback keys.
+ */
+ public ConfigOption<T> withFallbackKeys(String... fallbackKeys) {
+ FallbackKey[] fallbackKeyArray = Arrays.stream(fallbackKeys)
+ .map(FallbackKey::createFallbackKey)
+ .toArray(FallbackKey[]::new);
+ return new ConfigOption<>(key, description, defaultValue, fallbackKeyArray);
+ }
+
+ /**
+ * Creates a new config option, using this option's key and default value, and
* adding the given deprecated keys.
*
* <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
@@ -116,7 +134,10 @@ public class ConfigOption<T> {
* @return A new config options, with the given deprecated keys.
*/
public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
- return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+ FallbackKey[] fallbackKeys = Arrays.stream(deprecatedKeys)
+ .map(FallbackKey::createDeprecatedKey)
+ .toArray(FallbackKey[]::new);
+ return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
}
/**
@@ -138,7 +159,7 @@ public class ConfigOption<T> {
* @return A new config option, with given description.
*/
public ConfigOption<T> withDescription(final Description description) {
- return new ConfigOption<>(key, description, defaultValue, deprecatedKeys);
+ return new ConfigOption<>(key, description, defaultValue, fallbackKeys);
}
// ------------------------------------------------------------------------
@@ -168,19 +189,19 @@ public class ConfigOption<T> {
}
/**
- * Checks whether this option has deprecated keys.
- * @return True if the option has deprecated keys, false if not.
+ * Checks whether this option has fallback keys.
+ * @return True if the option has fallback keys, false if not.
*/
- public boolean hasDeprecatedKeys() {
- return deprecatedKeys != EMPTY;
+ public boolean hasFallbackKeys() {
+ return fallbackKeys != EMPTY;
}
/**
- * Gets the deprecated keys, in the order to be checked.
- * @return The option's deprecated keys.
+ * Gets the fallback keys, in the order to be checked.
+ * @return The option's fallback keys.
*/
- public Iterable<String> deprecatedKeys() {
- return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+ public Iterable<FallbackKey> fallbackKeys() {
+ return (fallbackKeys == EMPTY) ? Collections.emptyList() : Arrays.asList(fallbackKeys);
}
/**
@@ -201,7 +222,7 @@ public class ConfigOption<T> {
else if (o != null && o.getClass() == ConfigOption.class) {
ConfigOption<?> that = (ConfigOption<?>) o;
return this.key.equals(that.key) &&
- Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
+ Arrays.equals(this.fallbackKeys, that.fallbackKeys) &&
(this.defaultValue == null ? that.defaultValue == null :
(that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
}
@@ -213,13 +234,13 @@ public class ConfigOption<T> {
@Override
public int hashCode() {
return 31 * key.hashCode() +
- 17 * Arrays.hashCode(deprecatedKeys) +
+ 17 * Arrays.hashCode(fallbackKeys) +
(defaultValue != null ? defaultValue.hashCode() : 0);
}
@Override
public String toString() {
- return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
- key, defaultValue, Arrays.toString(deprecatedKeys));
+ return String.format("Key: '%s' , default: %s (fallback keys: %s)",
+ key, defaultValue, Arrays.toString(fallbackKeys));
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 00c4c38..07c0290 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -701,12 +701,11 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (this.confData.containsKey(configOption.key())) {
return true;
}
- else if (configOption.hasDeprecatedKeys()) {
- // try the deprecated keys
- for (String deprecatedKey : configOption.deprecatedKeys()) {
- if (this.confData.containsKey(deprecatedKey)) {
- LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
- deprecatedKey, configOption.key());
+ else if (configOption.hasFallbackKeys()) {
+ // try the fallback keys
+ for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+ if (this.confData.containsKey(fallbackKey.getKey())) {
+ loggingFallback(fallbackKey, configOption);
return true;
}
}
@@ -741,11 +740,10 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
// try the current key
Object oldValue = this.confData.remove(configOption.key());
if (oldValue == null){
- for (String deprecatedKey : configOption.deprecatedKeys()){
- oldValue = this.confData.remove(deprecatedKey);
+ for (FallbackKey fallbackKey : configOption.fallbackKeys()){
+ oldValue = this.confData.remove(fallbackKey.getKey());
if (oldValue != null){
- LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
- deprecatedKey, configOption.key());
+ loggingFallback(fallbackKey, configOption);
return true;
}
}
@@ -789,13 +787,12 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
// found a value for the current proper key
return o;
}
- else if (configOption.hasDeprecatedKeys()) {
+ else if (configOption.hasFallbackKeys()) {
// try the deprecated keys
- for (String deprecatedKey : configOption.deprecatedKeys()) {
- Object oo = getRawValue(deprecatedKey);
+ for (FallbackKey fallbackKey : configOption.fallbackKeys()) {
+ Object oo = getRawValue(fallbackKey.getKey());
if (oo != null) {
- LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
- deprecatedKey, configOption.key());
+ loggingFallback(fallbackKey, configOption);
return oo;
}
}
@@ -809,6 +806,16 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
return o != null ? o : configOption.defaultValue();
}
+ private void loggingFallback(FallbackKey fallbackKey, ConfigOption<?> configOption) {
+ if (fallbackKey.isDeprecated()) {
+ LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+ fallbackKey.getKey(), configOption.key());
+ } else {
+ LOG.info("Config uses fallback configuration key '{}' instead of key '{}'",
+ fallbackKey.getKey(), configOption.key());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Type conversion
// --------------------------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index 1a637f6..0a0a777 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -32,6 +32,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey;
+
/**
* A configuration that manages a subset of keys with a common prefix from a given configuration.
*/
@@ -361,17 +363,17 @@ public final class DelegatingConfiguration extends Configuration {
private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
String key = prefix + option.key();
- List<String> deprecatedKeys;
- if (option.hasDeprecatedKeys()) {
+ List<FallbackKey> deprecatedKeys;
+ if (option.hasFallbackKeys()) {
deprecatedKeys = new ArrayList<>();
- for (String dk : option.deprecatedKeys()) {
- deprecatedKeys.add(prefix + dk);
+ for (FallbackKey dk : option.fallbackKeys()) {
+ deprecatedKeys.add(createDeprecatedKey(prefix + dk.getKey()));
}
} else {
deprecatedKeys = Collections.emptyList();
}
- String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
+ FallbackKey[] deprecated = deprecatedKeys.toArray(new FallbackKey[0]);
return new ConfigOption<>(key,
option.description(),
option.defaultValue(),
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
new file mode 100644
index 0000000..6c67727
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A key with FallbackKeys will fall back to the FallbackKeys if it itself is not configured.
+ */
+public class FallbackKey {
+
+ // -------------------------
+ // Factory methods
+ // -------------------------
+
+ static FallbackKey createFallbackKey(String key) {
+ return new FallbackKey(key, false);
+ }
+
+ static FallbackKey createDeprecatedKey(String key) {
+ return new FallbackKey(key, true);
+ }
+
+ // ------------------------------------------------------------------------
+
+ private final String key;
+
+ private final boolean isDeprecated;
+
+ public String getKey() {
+ return key;
+ }
+
+ public boolean isDeprecated() {
+ return isDeprecated;
+ }
+
+ private FallbackKey(String key, boolean isDeprecated) {
+ this.key = key;
+ this.isDeprecated = isDeprecated;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o != null && o.getClass() == FallbackKey.class) {
+ FallbackKey that = (FallbackKey) o;
+ return this.key.equals(that.key) && (this.isDeprecated == that.isDeprecated);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * key.hashCode() + (isDeprecated ? 1 : 0);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("{key=%s, isDeprecated=%s}", key, isDeprecated);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 5c1b6d6..d4f5a74 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -43,7 +43,7 @@ public class RestOptions {
public static final ConfigOption<String> ADDRESS =
key("rest.address")
.noDefaultValue()
- .withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
+ .withFallbackKeys(JobManagerOptions.ADDRESS.key())
.withDescription("The address that should be used by clients to connect to the server.");
/**
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index 3b98a44..d0d382f 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -306,6 +306,39 @@ public class ConfigurationTest extends TestLogger {
}
@Test
+ public void testFallbackKeys() {
+ Configuration cfg = new Configuration();
+ cfg.setInteger("the-key", 11);
+ cfg.setInteger("old-key", 12);
+ cfg.setInteger("older-key", 13);
+
+ ConfigOption<Integer> matchesFirst = ConfigOptions
+ .key("the-key")
+ .defaultValue(-1)
+ .withFallbackKeys("old-key", "older-key");
+
+ ConfigOption<Integer> matchesSecond = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withFallbackKeys("old-key", "older-key");
+
+ ConfigOption<Integer> matchesThird = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withFallbackKeys("foo", "older-key");
+
+ ConfigOption<Integer> notContained = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withFallbackKeys("not-there", "also-not-there");
+
+ assertEquals(11, cfg.getInteger(matchesFirst));
+ assertEquals(12, cfg.getInteger(matchesSecond));
+ assertEquals(13, cfg.getInteger(matchesThird));
+ assertEquals(-1, cfg.getInteger(notContained));
+ }
+
+ @Test
public void testRemove(){
Configuration cfg = new Configuration();
cfg.setInteger("a", 1);