You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/13 15:40:17 UTC
[7/8] flink git commit: [FLINK-4764] [core] Introduce Config Options
[FLINK-4764] [core] Introduce Config Options
This is a more concise and maintainable way to define configuration keys, default values,
deprecated keys, etc.
This closes #2605
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d71a09cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d71a09cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d71a09cc
Branch: refs/heads/flip-6
Commit: d71a09cc2a36a877e8287db8d9fe84134a4901ba
Parents: 05436f4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 7 15:24:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigOption.java | 171 ++++++++
.../flink/configuration/ConfigOptions.java | 116 ++++++
.../flink/configuration/Configuration.java | 407 +++++++++++++++----
.../configuration/DelegatingConfiguration.java | 118 +++++-
.../flink/configuration/ConfigurationTest.java | 95 ++++-
.../DelegatingConfigurationTest.java | 55 +--
.../UnmodifiableConfigurationTest.java | 16 +-
7 files changed, 844 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
new file mode 100644
index 0000000..3531f6d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code ConfigOption} describes a configuration parameter. It encapsulates
+ * the configuration key, deprecated older versions of the key, and an optional
+ * default value for the configuration parameter.
+ *
+ * <p>{@code ConfigOptions} are built via the {@link ConfigOptions} class.
+ * Once created, a config option is immutable.
+ *
+ * @param <T> The type of value associated with the configuration option.
+ */
+@PublicEvolving
+public class ConfigOption<T> {
+
+ private static final String[] EMPTY = new String[0];
+
+ // ------------------------------------------------------------------------
+
+ /** The current key for that config option */
+ private final String key;
+
+ /** The list of deprecated keys, in the order to be checked */
+ private final String[] deprecatedKeys;
+
+ /** The default value for this option */
+ private final T defaultValue;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new config option with no deprecated keys.
+ *
+ * @param key The current key for that config option
+ * @param defaultValue The default value for this option
+ */
+ ConfigOption(String key, T defaultValue) {
+ this.key = checkNotNull(key);
+ this.defaultValue = defaultValue;
+ this.deprecatedKeys = EMPTY;
+ }
+
+ /**
+ * Creates a new config option with deprecated keys.
+ *
+ * @param key The current key for that config option
+ * @param defaultValue The default value for this option
+ * @param deprecatedKeys The list of deprecated keys, in the order to be checked
+ */
+ ConfigOption(String key, T defaultValue, String... deprecatedKeys) {
+ this.key = checkNotNull(key);
+ this.defaultValue = defaultValue;
+ this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new config option, using this option's key and default value, and
+ * adding the given deprecated keys.
+ *
+ * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
+ * the deprecated keys will be checked in the order provided to this method. The first key for which
+ * a value is found will be used - that value will be returned.
+ *
+ * @param deprecatedKeys The deprecated keys, in the order in which they should be checked.
+ * @return A new config options, with the given deprecated keys.
+ */
+ public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
+ return new ConfigOption<>(key, defaultValue, deprecatedKeys);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets the configuration key.
+ * @return The configuration key
+ */
+ public String key() {
+ return key;
+ }
+
+ /**
+ * Checks if this option has a default value.
+ * @return True if it has a default value, false if not.
+ */
+ public boolean hasDefaultValue() {
+ return defaultValue != null;
+ }
+
+ /**
+ * Returns the default value, or null, if there is no default value.
+ * @return The default value, or null.
+ */
+ public T defaultValue() {
+ return defaultValue;
+ }
+
+ /**
+ * Checks whether this option has deprecated keys.
+ * @return True if the option has deprecated keys, false if not.
+ */
+ public boolean hasDeprecatedKeys() {
+ return deprecatedKeys != EMPTY;
+ }
+
+ /**
+ * Gets the deprecated keys, in the order to be checked.
+ * @return The option's deprecated keys.
+ */
+ public Iterable<String> deprecatedKeys() {
+ return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o != null && o.getClass() == ConfigOption.class) {
+ ConfigOption<?> that = (ConfigOption<?>) o;
+ return this.key.equals(that.key) &&
+ Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
+ (this.defaultValue == null ? that.defaultValue == null :
+ (that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * key.hashCode() +
+ 17 * Arrays.hashCode(deprecatedKeys) +
+ (defaultValue != null ? defaultValue.hashCode() : 0);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
+ key, defaultValue, Arrays.toString(deprecatedKeys));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
new file mode 100644
index 0000000..f87da0a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code ConfigOptions} are used to build a {@link ConfigOption}.
+ * The option is typically built in one of the following pattern:
+ *
+ * <pre>{@code
+ * // simple string-valued option with a default value
+ * ConfigOption<String> tempDirs = ConfigOptions
+ * .key("tmp.dir")
+ * .defaultValue("/tmp");
+ *
+ * // simple integer-valued option with a default value
+ * ConfigOption<Integer> parallelism = ConfigOptions
+ * .key("application.parallelism")
+ * .defaultValue(100);
+ *
+ * // option with no default value
+ * ConfigOption<String> userName = ConfigOptions
+ * .key("user.name")
+ * .noDefaultValue();
+ *
+ * // option with deprecated keys to check
+ * ConfigOption<Double> threshold = ConfigOptions
+ * .key("cpu.utilization.threshold")
+ * .defaultValue(0.9).
+ * .withDeprecatedKeys("cpu.threshold");
+ * }</pre>
+ */
+@PublicEvolving
+public class ConfigOptions {
+
+ /**
+ * Starts building a new {@link ConfigOption}.
+ *
+ * @param key The key for the config option.
+ * @return The builder for the config option with the given key.
+ */
+ public static OptionBuilder key(String key) {
+ checkNotNull(key);
+ return new OptionBuilder(key);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * The option builder is used to create a {@link ConfigOption}.
+ * It is instantiated via {@link ConfigOptions#key(String)}.
+ */
+ public static final class OptionBuilder {
+
+ /** The key for the config option */
+ private final String key;
+
+ /**
+ * Creates a new OptionBuilder.
+ * @param key The key for the config option
+ */
+ OptionBuilder(String key) {
+ this.key = key;
+ }
+
+ /**
+ * Creates a ConfigOption with the given default value.
+ *
+ * <p>This method does not accept "null". For options with no default value, choose
+ * one of the {@code noDefaultValue} methods.
+ *
+ * @param value The default value for the config option
+ * @param <T> The type of the default value.
+ * @return The config option with the default value.
+ */
+ public <T> ConfigOption<T> defaultValue(T value) {
+ checkNotNull(value);
+ return new ConfigOption<T>(key, value);
+ }
+
+ /**
+ * Creates a string-valued option with no default value.
+ * String-valued options are the only ones that can have no
+ * default value.
+ *
+ * @return The created ConfigOption.
+ */
+ public ConfigOption<String> noDefaultValue() {
+ return new ConfigOption<>(key, null);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Not intended to be instantiated */
+ private ConfigOptions() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 8ca5d07..f15c669 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import java.util.Set;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
@@ -134,7 +135,33 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
return o.toString();
}
}
-
+
+ /**
+ * Returns the value associated with the given config option as a string.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public String getString(ConfigOption<String> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return o == null ? null : o.toString();
+ }
+
+ /**
+ * Returns the value associated with the given config option as a string.
+ * If no value is mapped under any key of the option, it returns the specified
+ * default instead of the option's default value.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public String getString(ConfigOption<String> configOption, String overrideDefault) {
+ Object o = getRawValueFromOption(configOption);
+ return o == null ? overrideDefault : o.toString();
+ }
+
/**
* Adds the given key/value pair to the configuration object.
*
@@ -148,6 +175,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
/**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setString(ConfigOption<String> key, String value) {
+ setValueInternal(key.key(), value);
+ }
+
+ /**
* Returns the value associated with the given key as an integer.
*
* @param key
@@ -161,28 +202,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (o == null) {
return defaultValue;
}
-
- if (o.getClass() == Integer.class) {
- return (Integer) o;
- }
- else if (o.getClass() == Long.class) {
- long value = (Long) o;
- if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
- return (int) value;
- } else {
- LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
- return defaultValue;
- }
- }
- else {
- try {
- return Integer.parseInt(o.toString());
- }
- catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
- return defaultValue;
- }
- }
+
+ return convertToInt(o, defaultValue);
+ }
+
+ /**
+ * Returns the value associated with the given config option as an integer.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public int getInteger(ConfigOption<Integer> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return convertToInt(o, configOption.defaultValue());
}
/**
@@ -198,6 +231,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
/**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setInteger(ConfigOption<Integer> key, int value) {
+ setValueInternal(key.key(), value);
+ }
+
+ /**
* Returns the value associated with the given key as a long.
*
* @param key
@@ -211,22 +258,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (o == null) {
return defaultValue;
}
-
- if (o.getClass() == Long.class) {
- return (Long) o;
- }
- else if (o.getClass() == Integer.class) {
- return ((Integer) o).longValue();
- }
- else {
- try {
- return Long.parseLong(o.toString());
- }
- catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number");
- return defaultValue;
- }
- }
+
+ return convertToLong(o, defaultValue);
+ }
+
+ /**
+ * Returns the value associated with the given config option as a long integer.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public long getLong(ConfigOption<Long> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return convertToLong(o, configOption.defaultValue());
}
/**
@@ -242,6 +287,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
/**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setLong(ConfigOption<Long> key, long value) {
+ setValueInternal(key.key(), value);
+ }
+
+ /**
* Returns the value associated with the given key as a boolean.
*
* @param key
@@ -255,13 +314,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (o == null) {
return defaultValue;
}
-
- if (o.getClass() == Boolean.class) {
- return (Boolean) o;
- }
- else {
- return Boolean.parseBoolean(o.toString());
- }
+
+ return convertToBoolean(o);
+ }
+
+ /**
+ * Returns the value associated with the given config option as a boolean.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public boolean getBoolean(ConfigOption<Boolean> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return convertToBoolean(o);
}
/**
@@ -277,6 +343,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
}
/**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+ setValueInternal(key.key(), value);
+ }
+
+ /**
* Returns the value associated with the given key as a float.
*
* @param key
@@ -290,28 +370,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (o == null) {
return defaultValue;
}
-
- if (o.getClass() == Float.class) {
- return (Float) o;
- }
- else if (o.getClass() == Double.class) {
- double value = ((Double) o);
- if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
- return (float) value;
- } else {
- LOG.warn("Configuration value {} overflows/underflows the float type.", value);
- return defaultValue;
- }
- }
- else {
- try {
- return Float.parseFloat(o.toString());
- }
- catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value {} as a float value", o);
- return defaultValue;
- }
- }
+
+ return convertToFloat(o, defaultValue);
+ }
+
+ /**
+ * Returns the value associated with the given config option as a float.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public float getFloat(ConfigOption<Float> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return convertToFloat(o, configOption.defaultValue());
}
/**
@@ -325,7 +397,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
public void setFloat(String key, float value) {
setValueInternal(key, value);
}
-
+
+ /**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setFloat(ConfigOption<Float> key, float value) {
+ setValueInternal(key.key(), value);
+ }
+
/**
* Returns the value associated with the given key as a double.
*
@@ -340,22 +426,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
if (o == null) {
return defaultValue;
}
-
- if (o.getClass() == Double.class) {
- return (Double) o;
- }
- else if (o.getClass() == Float.class) {
- return ((Float) o).doubleValue();
- }
- else {
- try {
- return Double.parseDouble(o.toString());
- }
- catch (NumberFormatException e) {
- LOG.warn("Configuration cannot evaluate value {} as a double value", o);
- return defaultValue;
- }
- }
+
+ return convertToDouble(o, defaultValue);
+ }
+
+ /**
+ * Returns the value associated with the given config option as a {@code double}.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public double getDouble(ConfigOption<Double> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return convertToDouble(o, configOption.defaultValue());
}
/**
@@ -369,7 +453,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
public void setDouble(String key, double value) {
setValueInternal(key, value);
}
-
+
+ /**
+ * Adds the given value to the configuration object.
+ * The main key of the config option will be used to map the value.
+ *
+ * @param key
+ * the option specifying the key to be added
+ * @param value
+ * the value of the key/value pair to be added
+ */
+ @PublicEvolving
+ public void setDouble(ConfigOption<Double> key, double value) {
+ setValueInternal(key.key(), value);
+ }
+
/**
* Returns the value associated with the given key as a byte array.
*
@@ -407,6 +505,18 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
setValueInternal(key, bytes);
}
+ /**
+ * Returns the value associated with the given config option as a string.
+ *
+ * @param configOption The configuration option
+ * @return the (default) value associated with the given config option
+ */
+ @PublicEvolving
+ public String getValue(ConfigOption<?> configOption) {
+ Object o = getValueOrDefaultFromOption(configOption);
+ return o == null ? null : o.toString();
+ }
+
// --------------------------------------------------------------------------------------------
/**
@@ -523,7 +633,130 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
return this.confData.get(key);
}
}
-
+
+ private Object getRawValueFromOption(ConfigOption<?> configOption) {
+ // first try the current key
+ Object o = getRawValue(configOption.key());
+
+ if (o != null) {
+ return o;
+ }
+ else if (configOption.hasDeprecatedKeys()) {
+ for (String deprecatedKey : configOption.deprecatedKeys()) {
+ Object oo = getRawValue(deprecatedKey);
+ if (oo != null) {
+ return oo;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private Object getValueOrDefaultFromOption(ConfigOption<?> configOption) {
+ Object o = getRawValueFromOption(configOption);
+ return o != null ? o : configOption.defaultValue();
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Type conversion
+ // --------------------------------------------------------------------------------------------
+
+ private int convertToInt(Object o, int defaultValue) {
+ if (o.getClass() == Integer.class) {
+ return (Integer) o;
+ }
+ else if (o.getClass() == Long.class) {
+ long value = (Long) o;
+ if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+ return (int) value;
+ } else {
+ LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
+ return defaultValue;
+ }
+ }
+ else {
+ try {
+ return Integer.parseInt(o.toString());
+ }
+ catch (NumberFormatException e) {
+ LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
+ return defaultValue;
+ }
+ }
+ }
+
+ private long convertToLong(Object o, long defaultValue) {
+ if (o.getClass() == Long.class) {
+ return (Long) o;
+ }
+ else if (o.getClass() == Integer.class) {
+ return ((Integer) o).longValue();
+ }
+ else {
+ try {
+ return Long.parseLong(o.toString());
+ }
+ catch (NumberFormatException e) {
+ LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number");
+ return defaultValue;
+ }
+ }
+ }
+
+ private boolean convertToBoolean(Object o) {
+ if (o.getClass() == Boolean.class) {
+ return (Boolean) o;
+ }
+ else {
+ return Boolean.parseBoolean(o.toString());
+ }
+ }
+
+ private float convertToFloat(Object o, float defaultValue) {
+ if (o.getClass() == Float.class) {
+ return (Float) o;
+ }
+ else if (o.getClass() == Double.class) {
+ double value = ((Double) o);
+ if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
+ return (float) value;
+ } else {
+ LOG.warn("Configuration value {} overflows/underflows the float type.", value);
+ return defaultValue;
+ }
+ }
+ else {
+ try {
+ return Float.parseFloat(o.toString());
+ }
+ catch (NumberFormatException e) {
+ LOG.warn("Configuration cannot evaluate value {} as a float value", o);
+ return defaultValue;
+ }
+ }
+ }
+
+ private double convertToDouble(Object o, double defaultValue) {
+ if (o.getClass() == Double.class) {
+ return (Double) o;
+ }
+ else if (o.getClass() == Float.class) {
+ return ((Float) o).doubleValue();
+ }
+ else {
+ try {
+ return Double.parseDouble(o.toString());
+ }
+ catch (NumberFormatException e) {
+ LOG.warn("Configuration cannot evaluate value {} as a double value", o);
+ return defaultValue;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Serialization
// --------------------------------------------------------------------------------------------
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index dba77f3..bd9a962 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.configuration;
import org.apache.flink.core.memory.DataInputView;
@@ -22,7 +23,11 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -55,8 +60,7 @@ public final class DelegatingConfiguration extends Configuration {
* @param backingConfig The configuration holding the actual config data.
* @param prefix The prefix prepended to all config keys.
*/
- public DelegatingConfiguration(Configuration backingConfig, String prefix)
- {
+ public DelegatingConfiguration(Configuration backingConfig, String prefix) {
this.backingConfig = Preconditions.checkNotNull(backingConfig);
this.prefix = prefix;
}
@@ -69,11 +73,26 @@ public final class DelegatingConfiguration extends Configuration {
}
@Override
+ public String getString(ConfigOption<String> configOption) {
+ return this.backingConfig.getString(prefixOption(configOption, prefix));
+ }
+
+ @Override
+ public String getString(ConfigOption<String> configOption, String overrideDefault) {
+ return this.backingConfig.getString(prefixOption(configOption, prefix), overrideDefault);
+ }
+
+ @Override
public void setString(String key, String value) {
this.backingConfig.setString(this.prefix + key, value);
}
@Override
+ public void setString(ConfigOption<String> key, String value) {
+ this.backingConfig.setString(prefix + key.key(), value);
+ }
+
+ @Override
public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
}
@@ -89,51 +108,101 @@ public final class DelegatingConfiguration extends Configuration {
}
@Override
+ public int getInteger(ConfigOption<Integer> configOption) {
+ return this.backingConfig.getInteger(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void setInteger(String key, int value) {
this.backingConfig.setInteger(this.prefix + key, value);
}
@Override
+ public void setInteger(ConfigOption<Integer> key, int value) {
+ this.backingConfig.setInteger(prefix + key.key(), value);
+ }
+
+ @Override
public long getLong(String key, long defaultValue) {
return this.backingConfig.getLong(this.prefix + key, defaultValue);
}
@Override
+ public long getLong(ConfigOption<Long> configOption) {
+ return this.backingConfig.getLong(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void setLong(String key, long value) {
this.backingConfig.setLong(this.prefix + key, value);
}
@Override
+ public void setLong(ConfigOption<Long> key, long value) {
+ this.backingConfig.setLong(prefix + key.key(), value);
+ }
+
+ @Override
public boolean getBoolean(String key, boolean defaultValue) {
return this.backingConfig.getBoolean(this.prefix + key, defaultValue);
}
@Override
+ public boolean getBoolean(ConfigOption<Boolean> configOption) {
+ return this.backingConfig.getBoolean(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void setBoolean(String key, boolean value) {
this.backingConfig.setBoolean(this.prefix + key, value);
}
@Override
+ public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+ this.backingConfig.setBoolean(prefix + key.key(), value);
+ }
+
+ @Override
public float getFloat(String key, float defaultValue) {
return this.backingConfig.getFloat(this.prefix + key, defaultValue);
}
@Override
+ public float getFloat(ConfigOption<Float> configOption) {
+ return this.backingConfig.getFloat(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void setFloat(String key, float value) {
this.backingConfig.setFloat(this.prefix + key, value);
}
@Override
+ public void setFloat(ConfigOption<Float> key, float value) {
+ this.backingConfig.setFloat(prefix + key.key(), value);
+ }
+
+ @Override
public double getDouble(String key, double defaultValue) {
return this.backingConfig.getDouble(this.prefix + key, defaultValue);
}
@Override
+ public double getDouble(ConfigOption<Double> configOption) {
+ return this.backingConfig.getDouble(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void setDouble(String key, double value) {
this.backingConfig.setDouble(this.prefix + key, value);
}
@Override
+ public void setDouble(ConfigOption<Double> key, double value) {
+ this.backingConfig.setDouble(prefix + key.key(), value);
+ }
+
+ @Override
public byte[] getBytes(final String key, final byte[] defaultValue) {
return this.backingConfig.getBytes(this.prefix + key, defaultValue);
}
@@ -144,6 +213,11 @@ public final class DelegatingConfiguration extends Configuration {
}
@Override
+ public String getValue(ConfigOption<?> configOption) {
+ return this.backingConfig.getValue(prefixOption(configOption, prefix));
+ }
+
+ @Override
public void addAllToProperties(Properties props) {
// only add keys with our prefix
synchronized (backingConfig.confData) {
@@ -195,6 +269,27 @@ public final class DelegatingConfiguration extends Configuration {
return set;
}
+ @Override
+ public Configuration clone() {
+ return new DelegatingConfiguration(backingConfig.clone(), prefix);
+ }
+
+ @Override
+ public Map<String, String> toMap() {
+ Map<String, String> map = backingConfig.toMap();
+ Map<String, String> prefixed = new HashMap<>(map.size());
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ prefixed.put(prefix + entry.getKey(), entry.getValue());
+ }
+
+ return prefixed;
+ }
+
+ @Override
+ public boolean containsKey(String key) {
+ return backingConfig.containsKey(prefix + key);
+ }
+
// --------------------------------------------------------------------------------------------
@Override
@@ -225,4 +320,23 @@ public final class DelegatingConfiguration extends Configuration {
return false;
}
}
+
+ // --------------------------------------------------------------------------------------------
+
+ private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
+ String key = prefix + option.key();
+
+ List<String> deprecatedKeys;
+ if (option.hasDeprecatedKeys()) {
+ deprecatedKeys = new ArrayList<>();
+ for (String dk : option.deprecatedKeys()) {
+ deprecatedKeys.add(prefix + dk);
+ }
+ } else {
+ deprecatedKeys = Collections.emptyList();
+ }
+
+ String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
+ return new ConfigOption<T>(key, option.defaultValue(), deprecated);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cf3c908..91c5f65 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.configuration;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -33,7 +34,7 @@ import org.junit.Test;
* objects is tested.
*/
public class ConfigurationTest extends TestLogger {
-
+
private static final byte[] EMPTY_BYTES = new byte[0];
private static final long TOO_LONG = Integer.MAX_VALUE + 10L;
private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE;
@@ -73,7 +74,7 @@ public class ConfigurationTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testConversions() {
try {
@@ -175,7 +176,7 @@ public class ConfigurationTest extends TestLogger {
fail(e.getMessage());
}
}
-
+
@Test
public void testCopyConstructor() {
try {
@@ -194,4 +195,92 @@ public class ConfigurationTest extends TestLogger {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testOptionWithDefault() {
+ Configuration cfg = new Configuration();
+ cfg.setInteger("int-key", 11);
+ cfg.setString("string-key", "abc");
+
+ ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").defaultValue("my-beautiful-default");
+ ConfigOption<Integer> presentIntOption = ConfigOptions.key("int-key").defaultValue(87);
+
+ assertEquals("abc", cfg.getString(presentStringOption));
+ assertEquals("abc", cfg.getValue(presentStringOption));
+
+ assertEquals(11, cfg.getInteger(presentIntOption));
+ assertEquals("11", cfg.getValue(presentIntOption));
+
+ // test getting default when no value is present
+
+ ConfigOption<String> stringOption = ConfigOptions.key("test").defaultValue("my-beautiful-default");
+ ConfigOption<Integer> intOption = ConfigOptions.key("test2").defaultValue(87);
+
+ // getting strings with default value should work
+ assertEquals("my-beautiful-default", cfg.getValue(stringOption));
+ assertEquals("my-beautiful-default", cfg.getString(stringOption));
+
+ // overriding the default should work
+ assertEquals("override", cfg.getString(stringOption, "override"));
+
+ // getting a primitive with a default value should work
+ assertEquals(87, cfg.getInteger(intOption));
+ assertEquals("87", cfg.getValue(intOption));
+ }
+
+ @Test
+ public void testOptionWithNoDefault() {
+ Configuration cfg = new Configuration();
+ cfg.setInteger("int-key", 11);
+ cfg.setString("string-key", "abc");
+
+ ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").noDefaultValue();
+
+ assertEquals("abc", cfg.getString(presentStringOption));
+ assertEquals("abc", cfg.getValue(presentStringOption));
+
+ // test getting default when no value is present
+
+ ConfigOption<String> stringOption = ConfigOptions.key("test").noDefaultValue();
+
+ // getting strings for null should work
+ assertNull(cfg.getValue(stringOption));
+ assertNull(cfg.getString(stringOption));
+
+ // overriding the null default should work
+ assertEquals("override", cfg.getString(stringOption, "override"));
+ }
+
+ @Test
+ public void testDeprecatedKeys() {
+ Configuration cfg = new Configuration();
+ cfg.setInteger("the-key", 11);
+ cfg.setInteger("old-key", 12);
+ cfg.setInteger("older-key", 13);
+
+ ConfigOption<Integer> matchesFirst = ConfigOptions
+ .key("the-key")
+ .defaultValue(-1)
+ .withDeprecatedKeys("old-key", "older-key");
+
+ ConfigOption<Integer> matchesSecond = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withDeprecatedKeys("old-key", "older-key");
+
+ ConfigOption<Integer> matchesThird = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withDeprecatedKeys("foo", "older-key");
+
+ ConfigOption<Integer> notContained = ConfigOptions
+ .key("does-not-exist")
+ .defaultValue(-1)
+ .withDeprecatedKeys("not-there", "also-not-there");
+
+ assertEquals(11, cfg.getInteger(matchesFirst));
+ assertEquals(12, cfg.getInteger(matchesSecond));
+ assertEquals(13, cfg.getInteger(matchesThird));
+ assertEquals(-1, cfg.getInteger(notContained));
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index d8b782d..9298a14 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -24,8 +24,6 @@ import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.Set;
import static org.junit.Assert.assertTrue;
@@ -34,60 +32,43 @@ import static org.junit.Assert.assertEquals;
public class DelegatingConfigurationTest {
- /**
- * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
- */
@Test
public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
- Comparator<Method> methodComparator = new Comparator<Method>() {
- @Override
- public int compare(Method o1, Method o2) {
- String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
- String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
- return o1Str.compareTo( o2Str );
- }
-
- private String typeParamToString(Class<?>[] classes) {
- String str = "";
- for(Object t : classes) {
- str += t.toString();
- }
- return str;
- }
- };
-
// For each method in the Configuration class...
Method[] confMethods = Configuration.class.getDeclaredMethods();
Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
- Arrays.sort(confMethods, methodComparator);
- Arrays.sort(delegateMethods, methodComparator);
- match : for (Method configurationMethod : confMethods) {
- boolean hasMethod = false;
- if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+
+ for (Method configurationMethod : confMethods) {
+ if (!Modifier.isPublic(configurationMethod.getModifiers()) ) {
continue;
}
+
+ boolean hasMethod = false;
+
// Find matching method in wrapper class and call it
- mismatch: for (Method wrapperMethod : delegateMethods) {
+ lookForWrapper: for (Method wrapperMethod : delegateMethods) {
if (configurationMethod.getName().equals(wrapperMethod.getName())) {
-
+
// Get parameters for method
Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
- if(wrapperMethodParams.length != configMethodParams.length) {
- System.err.println("Length");
- break mismatch;
+ if (wrapperMethodParams.length != configMethodParams.length) {
+ continue;
}
- for(int i = 0; i < wrapperMethodParams.length; i++) {
- if(wrapperMethodParams[i] != configMethodParams[i]) {
- break mismatch;
+
+ for (int i = 0; i < wrapperMethodParams.length; i++) {
+ if (wrapperMethodParams[i] != configMethodParams[i]) {
+ continue lookForWrapper;
}
}
hasMethod = true;
- break match;
+ break;
}
}
- assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
+
+ assertTrue("Configuration method '" + configurationMethod.getName() +
+ "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
index 386d03b..26e3d7a 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -56,6 +56,9 @@ public class UnmodifiableConfigurationTest extends TestLogger {
@Test
public void testExceptionOnSet() {
try {
+ @SuppressWarnings("rawtypes")
+ final ConfigOption rawOption = ConfigOptions.key("testkey").defaultValue("value");
+
Map<Class<?>, Object> parameters = new HashMap<Class<?>, Object>();
parameters.put(byte[].class, new byte[0]);
parameters.put(Class.class, Object.class);
@@ -65,19 +68,22 @@ public class UnmodifiableConfigurationTest extends TestLogger {
parameters.put(double.class, 0.0);
parameters.put(String.class, "");
parameters.put(boolean.class, false);
-
+
Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
UnmodifiableConfiguration config = new UnmodifiableConfiguration(new Configuration());
-
+
for (Method m : clazz.getMethods()) {
if (m.getName().startsWith("set")) {
-
+
+ Class<?> keyClass = m.getParameterTypes()[0];
Class<?> parameterClass = m.getParameterTypes()[1];
+ Object key = keyClass == String.class ? "key" : rawOption;
+
Object parameter = parameters.get(parameterClass);
assertNotNull("method " + m + " not covered by test", parameter);
-
+
try {
- m.invoke(config, "key", parameter);
+ m.invoke(config, key, parameter);
fail("should fail with an exception");
}
catch (InvocationTargetException e) {