You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/13 15:40:17 UTC

[7/8] flink git commit: [FLINK-4764] [core] Introduce Config Options

[FLINK-4764] [core] Introduce Config Options

This is a more concise and maintainable way to define configuration keys, default values,
deprecated keys, etc.

This closes #2605


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d71a09cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d71a09cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d71a09cc

Branch: refs/heads/flip-6
Commit: d71a09cc2a36a877e8287db8d9fe84134a4901ba
Parents: 05436f4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 7 15:24:44 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigOption.java       | 171 ++++++++
 .../flink/configuration/ConfigOptions.java      | 116 ++++++
 .../flink/configuration/Configuration.java      | 407 +++++++++++++++----
 .../configuration/DelegatingConfiguration.java  | 118 +++++-
 .../flink/configuration/ConfigurationTest.java  |  95 ++++-
 .../DelegatingConfigurationTest.java            |  55 +--
 .../UnmodifiableConfigurationTest.java          |  16 +-
 7 files changed, 844 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
new file mode 100644
index 0000000..3531f6d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code ConfigOption} describes a configuration parameter. It encapsulates
+ * the configuration key, deprecated older versions of the key, and an optional
+ * default value for the configuration parameter.
+ * 
+ * <p>{@code ConfigOptions} are built via the {@link ConfigOptions} class.
+ * Once created, a config option is immutable.
+ * 
+ * @param <T> The type of value associated with the configuration option.
+ */
+@PublicEvolving
+public class ConfigOption<T> {
+
+	private static final String[] EMPTY = new String[0];
+
+	// ------------------------------------------------------------------------
+
+	/** The current key for that config option */
+	private final String key;
+
+	/** The list of deprecated keys, in the order to be checked */
+	private final String[] deprecatedKeys;
+
+	/** The default value for this option */
+	private final T defaultValue;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new config option with no deprecated keys.
+	 *
+	 * @param key             The current key for that config option
+	 * @param defaultValue    The default value for this option
+	 */
+	ConfigOption(String key, T defaultValue) {
+		this.key = checkNotNull(key);
+		this.defaultValue = defaultValue;
+		this.deprecatedKeys = EMPTY;
+	}
+
+	/**
+	 * Creates a new config option with deprecated keys.
+	 *
+	 * @param key             The current key for that config option
+	 * @param defaultValue    The default value for this option
+	 * @param deprecatedKeys  The list of deprecated keys, in the order to be checked
+	 */
+	ConfigOption(String key, T defaultValue, String... deprecatedKeys) {
+		this.key = checkNotNull(key);
+		this.defaultValue = defaultValue;
+		this.deprecatedKeys = deprecatedKeys == null || deprecatedKeys.length == 0 ? EMPTY : deprecatedKeys;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new config option, using this option's key and default value, and
+	 * adding the given deprecated keys.
+	 * 
+	 * <p>When obtaining a value from the configuration via {@link Configuration#getValue(ConfigOption)},
+	 * the deprecated keys will be checked in the order provided to this method. The first key for which
+	 * a value is found will be used - that value will be returned.
+	 * 
+	 * @param deprecatedKeys The deprecated keys, in the order in which they should be checked.
+	 * @return A new config options, with the given deprecated keys.
+	 */
+	public ConfigOption<T> withDeprecatedKeys(String... deprecatedKeys) {
+		return new ConfigOption<>(key, defaultValue, deprecatedKeys);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the configuration key.
+	 * @return The configuration key
+	 */
+	public String key() {
+		return key;
+	}
+
+	/**
+	 * Checks if this option has a default value.
+	 * @return True if it has a default value, false if not.
+	 */
+	public boolean hasDefaultValue() {
+		return defaultValue != null;
+	}
+
+	/**
+	 * Returns the default value, or null, if there is no default value.
+	 * @return The default value, or null.
+	 */
+	public T defaultValue() {
+		return defaultValue;
+	}
+
+	/**
+	 * Checks whether this option has deprecated keys.
+	 * @return True if the option has deprecated keys, false if not.
+	 */
+	public boolean hasDeprecatedKeys() {
+		return deprecatedKeys != EMPTY;
+	}
+
+	/**
+	 * Gets the deprecated keys, in the order to be checked.
+	 * @return The option's deprecated keys.
+	 */
+	public Iterable<String> deprecatedKeys() {
+		return deprecatedKeys == EMPTY ? Collections.<String>emptyList() : Arrays.asList(deprecatedKeys);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		else if (o != null && o.getClass() == ConfigOption.class) {
+			ConfigOption<?> that = (ConfigOption<?>) o;
+			return this.key.equals(that.key) &&
+					Arrays.equals(this.deprecatedKeys, that.deprecatedKeys) &&
+					(this.defaultValue == null ? that.defaultValue == null :
+							(that.defaultValue != null && this.defaultValue.equals(that.defaultValue)));
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * key.hashCode() +
+				17 * Arrays.hashCode(deprecatedKeys) +
+				(defaultValue != null ? defaultValue.hashCode() : 0);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("Key: '%s' , default: %s (deprecated keys: %s)",
+				key, defaultValue, Arrays.toString(deprecatedKeys));
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
new file mode 100644
index 0000000..f87da0a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigOptions.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@code ConfigOptions} are used to build a {@link ConfigOption}.
+ * The option is typically built in one of the following pattern:
+ * 
+ * <pre>{@code
+ * // simple string-valued option with a default value
+ * ConfigOption<String> tempDirs = ConfigOptions
+ *     .key("tmp.dir")
+ *     .defaultValue("/tmp");
+ * 
+ * // simple integer-valued option with a default value
+ * ConfigOption<Integer> parallelism = ConfigOptions
+ *     .key("application.parallelism")
+ *     .defaultValue(100);
+ * 
+ * // option with no default value
+ * ConfigOption<String> userName = ConfigOptions
+ *     .key("user.name")
+ *     .noDefaultValue();
+ * 
+ * // option with deprecated keys to check
+ * ConfigOption<Double> threshold = ConfigOptions
+ *     .key("cpu.utilization.threshold")
+ *     .defaultValue(0.9).
+ *     .withDeprecatedKeys("cpu.threshold");
+ * }</pre>
+ */
+@PublicEvolving
+public class ConfigOptions {
+
+	/**
+	 * Starts building a new {@link ConfigOption}.
+	 * 
+	 * @param key The key for the config option.
+	 * @return The builder for the config option with the given key.
+	 */
+	public static OptionBuilder key(String key) {
+		checkNotNull(key);
+		return new OptionBuilder(key);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * The option builder is used to create a {@link ConfigOption}.
+	 * It is instantiated via {@link ConfigOptions#key(String)}.
+	 */
+	public static final class OptionBuilder {
+
+		/** The key for the config option */
+		private final String key;
+
+		/**
+		 * Creates a new OptionBuilder.
+		 * @param key The key for the config option
+		 */
+		OptionBuilder(String key) {
+			this.key = key;
+		}
+
+		/**
+		 * Creates a ConfigOption with the given default value.
+		 * 
+		 * <p>This method does not accept "null". For options with no default value, choose
+		 * one of the {@code noDefaultValue} methods.
+		 * 
+		 * @param value The default value for the config option
+		 * @param <T> The type of the default value.
+		 * @return The config option with the default value.
+		 */
+		public <T> ConfigOption<T> defaultValue(T value) {
+			checkNotNull(value);
+			return new ConfigOption<T>(key, value);
+		}
+
+		/**
+		 * Creates a string-valued option with no default value.
+		 * String-valued options are the only ones that can have no
+		 * default value.
+		 * 
+		 * @return The created ConfigOption.
+		 */
+		public ConfigOption<String> noDefaultValue() {
+			return new ConfigOption<>(key, null);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** Not intended to be instantiated */
+	private ConfigOptions() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 8ca5d07..f15c669 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
@@ -134,7 +135,33 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			return o.toString();
 		}
 	}
-	
+
+	/**
+	 * Returns the value associated with the given config option as a string.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public String getString(ConfigOption<String> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return o == null ? null : o.toString();
+	}
+
+	/**
+	 * Returns the value associated with the given config option as a string.
+	 * If no value is mapped under any key of the option, it returns the specified
+	 * default instead of the option's default value.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public String getString(ConfigOption<String> configOption, String overrideDefault) {
+		Object o = getRawValueFromOption(configOption);
+		return o == null ? overrideDefault : o.toString();
+	}
+
 	/**
 	 * Adds the given key/value pair to the configuration object.
 	 * 
@@ -148,6 +175,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setString(ConfigOption<String> key, String value) {
+		setValueInternal(key.key(), value);
+	}
+
+	/**
 	 * Returns the value associated with the given key as an integer.
 	 * 
 	 * @param key
@@ -161,28 +202,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return defaultValue;
 		}
-		
-		if (o.getClass() == Integer.class) {
-			return (Integer) o;
-		}
-		else if (o.getClass() == Long.class) {
-			long value = (Long) o;
-			if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-				return (int) value;
-			} else {
-				LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
-				return defaultValue;
-			}
-		}
-		else {
-			try {
-				return Integer.parseInt(o.toString());
-			}
-			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
-				return defaultValue;
-			}
-		}
+
+		return convertToInt(o, defaultValue);
+	}
+
+	/**
+	 * Returns the value associated with the given config option as an integer.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public int getInteger(ConfigOption<Integer> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return convertToInt(o, configOption.defaultValue());
 	}
 
 	/**
@@ -198,6 +231,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setInteger(ConfigOption<Integer> key, int value) {
+		setValueInternal(key.key(), value);
+	}
+
+	/**
 	 * Returns the value associated with the given key as a long.
 	 * 
 	 * @param key
@@ -211,22 +258,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return defaultValue;
 		}
-		
-		if (o.getClass() == Long.class) {
-			return (Long) o;
-		}
-		else if (o.getClass() == Integer.class) {
-			return ((Integer) o).longValue();
-		}
-		else {
-			try {
-				return Long.parseLong(o.toString());
-			}
-			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number");
-				return defaultValue;
-			}
-		}
+
+		return convertToLong(o, defaultValue);
+	}
+
+	/**
+	 * Returns the value associated with the given config option as a long integer.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public long getLong(ConfigOption<Long> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return convertToLong(o, configOption.defaultValue());
 	}
 
 	/**
@@ -242,6 +287,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setLong(ConfigOption<Long> key, long value) {
+		setValueInternal(key.key(), value);
+	}
+
+	/**
 	 * Returns the value associated with the given key as a boolean.
 	 * 
 	 * @param key
@@ -255,13 +314,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return defaultValue;
 		}
-		
-		if (o.getClass() == Boolean.class) {
-			return (Boolean) o;
-		}
-		else {
-			return Boolean.parseBoolean(o.toString());
-		}
+
+		return convertToBoolean(o);
+	}
+
+	/**
+	 * Returns the value associated with the given config option as a boolean.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public boolean getBoolean(ConfigOption<Boolean> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return convertToBoolean(o);
 	}
 
 	/**
@@ -277,6 +343,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	}
 
 	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+		setValueInternal(key.key(), value);
+	}
+
+	/**
 	 * Returns the value associated with the given key as a float.
 	 * 
 	 * @param key
@@ -290,28 +370,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return defaultValue;
 		}
-		
-		if (o.getClass() == Float.class) {
-			return (Float) o;
-		}
-		else if (o.getClass() == Double.class) {
-			double value = ((Double) o);
-			if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
-				return (float) value;
-			} else {
-				LOG.warn("Configuration value {} overflows/underflows the float type.", value);
-				return defaultValue;
-			}
-		}
-		else {
-			try {
-				return Float.parseFloat(o.toString());
-			}
-			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value {} as a float value", o);
-				return defaultValue;
-			}
-		}
+
+		return convertToFloat(o, defaultValue);
+	}
+
+	/**
+	 * Returns the value associated with the given config option as a float.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public float getFloat(ConfigOption<Float> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return convertToFloat(o, configOption.defaultValue());
 	}
 
 	/**
@@ -325,7 +397,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	public void setFloat(String key, float value) {
 		setValueInternal(key, value);
 	}
-	
+
+	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setFloat(ConfigOption<Float> key, float value) {
+		setValueInternal(key.key(), value);
+	}
+
 	/**
 	 * Returns the value associated with the given key as a double.
 	 * 
@@ -340,22 +426,20 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		if (o == null) {
 			return defaultValue;
 		}
-		
-		if (o.getClass() == Double.class) {
-			return (Double) o;
-		}
-		else if (o.getClass() == Float.class) {
-			return ((Float) o).doubleValue();
-		}
-		else {
-			try {
-				return Double.parseDouble(o.toString());
-			}
-			catch (NumberFormatException e) {
-				LOG.warn("Configuration cannot evaluate value {} as a double value", o);
-				return defaultValue;
-			}
-		}
+
+		return convertToDouble(o, defaultValue);
+	}
+
+	/**
+	 * Returns the value associated with the given config option as a {@code double}.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public double getDouble(ConfigOption<Double> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return convertToDouble(o, configOption.defaultValue());
 	}
 
 	/**
@@ -369,7 +453,21 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 	public void setDouble(String key, double value) {
 		setValueInternal(key, value);
 	}
-	
+
+	/**
+	 * Adds the given value to the configuration object.
+	 * The main key of the config option will be used to map the value.
+	 *
+	 * @param key
+	 *        the option specifying the key to be added
+	 * @param value
+	 *        the value of the key/value pair to be added
+	 */
+	@PublicEvolving
+	public void setDouble(ConfigOption<Double> key, double value) {
+		setValueInternal(key.key(), value);
+	}
+
 	/**
 	 * Returns the value associated with the given key as a byte array.
 	 * 
@@ -407,6 +505,18 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		setValueInternal(key, bytes);
 	}
 
+	/**
+	 * Returns the value associated with the given config option as a string.
+	 *
+	 * @param configOption The configuration option
+	 * @return the (default) value associated with the given config option
+	 */
+	@PublicEvolving
+	public String getValue(ConfigOption<?> configOption) {
+		Object o = getValueOrDefaultFromOption(configOption);
+		return o == null ? null : o.toString();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -523,7 +633,130 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 			return this.confData.get(key);
 		}
 	}
-	
+
+	private Object getRawValueFromOption(ConfigOption<?> configOption) {
+		// first try the current key
+		Object o = getRawValue(configOption.key());
+
+		if (o != null) {
+			return o;
+		}
+		else if (configOption.hasDeprecatedKeys()) {
+			for (String deprecatedKey : configOption.deprecatedKeys()) {
+				Object oo = getRawValue(deprecatedKey);
+				if (oo != null) {
+					return oo;
+				}
+			}
+		}
+
+		return null;
+	}
+
+	private Object getValueOrDefaultFromOption(ConfigOption<?> configOption) {
+		Object o = getRawValueFromOption(configOption);
+		return o != null ? o : configOption.defaultValue();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Type conversion
+	// --------------------------------------------------------------------------------------------
+
+	private int convertToInt(Object o, int defaultValue) {
+		if (o.getClass() == Integer.class) {
+			return (Integer) o;
+		}
+		else if (o.getClass() == Long.class) {
+			long value = (Long) o;
+			if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+				return (int) value;
+			} else {
+				LOG.warn("Configuration value {} overflows/underflows the integer type.", value);
+				return defaultValue;
+			}
+		}
+		else {
+			try {
+				return Integer.parseInt(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value {} as an integer number", o);
+				return defaultValue;
+			}
+		}
+	}
+
+	private long convertToLong(Object o, long defaultValue) {
+		if (o.getClass() == Long.class) {
+			return (Long) o;
+		}
+		else if (o.getClass() == Integer.class) {
+			return ((Integer) o).longValue();
+		}
+		else {
+			try {
+				return Long.parseLong(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number");
+				return defaultValue;
+			}
+		}
+	}
+
+	private boolean convertToBoolean(Object o) {
+		if (o.getClass() == Boolean.class) {
+			return (Boolean) o;
+		}
+		else {
+			return Boolean.parseBoolean(o.toString());
+		}
+	}
+
+	private float convertToFloat(Object o, float defaultValue) {
+		if (o.getClass() == Float.class) {
+			return (Float) o;
+		}
+		else if (o.getClass() == Double.class) {
+			double value = ((Double) o);
+			if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
+				return (float) value;
+			} else {
+				LOG.warn("Configuration value {} overflows/underflows the float type.", value);
+				return defaultValue;
+			}
+		}
+		else {
+			try {
+				return Float.parseFloat(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value {} as a float value", o);
+				return defaultValue;
+			}
+		}
+	}
+
+	private double convertToDouble(Object o, double defaultValue) {
+		if (o.getClass() == Double.class) {
+			return (Double) o;
+		}
+		else if (o.getClass() == Float.class) {
+			return ((Float) o).doubleValue();
+		}
+		else {
+			try {
+				return Double.parseDouble(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value {} as a double value", o);
+				return defaultValue;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Serialization
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index dba77f3..bd9a962 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.configuration;
 
 import org.apache.flink.core.memory.DataInputView;
@@ -22,7 +23,11 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -55,8 +60,7 @@ public final class DelegatingConfiguration extends Configuration {
 	 * @param backingConfig The configuration holding the actual config data.
 	 * @param prefix The prefix prepended to all config keys.
 	 */
-	public DelegatingConfiguration(Configuration backingConfig, String prefix)
-	{
+	public DelegatingConfiguration(Configuration backingConfig, String prefix) {
 		this.backingConfig = Preconditions.checkNotNull(backingConfig);
 		this.prefix = prefix;
 	}
@@ -69,11 +73,26 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public String getString(ConfigOption<String> configOption) {
+		return  this.backingConfig.getString(prefixOption(configOption, prefix));
+	}
+
+	@Override
+	public String getString(ConfigOption<String> configOption, String overrideDefault) {
+		return  this.backingConfig.getString(prefixOption(configOption, prefix), overrideDefault);
+	}
+
+	@Override
 	public void setString(String key, String value) {
 		this.backingConfig.setString(this.prefix + key, value);
 	}
 
 	@Override
+	public void setString(ConfigOption<String> key, String value) {
+		this.backingConfig.setString(prefix + key.key(), value);
+	}
+
+	@Override
 	public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
 		return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
 	}
@@ -89,51 +108,101 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public int getInteger(ConfigOption<Integer> configOption) {
+		return  this.backingConfig.getInteger(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void setInteger(String key, int value) {
 		this.backingConfig.setInteger(this.prefix + key, value);
 	}
 
 	@Override
+	public void setInteger(ConfigOption<Integer> key, int value) {
+		this.backingConfig.setInteger(prefix + key.key(), value);
+	}
+
+	@Override
 	public long getLong(String key, long defaultValue) {
 		return this.backingConfig.getLong(this.prefix + key, defaultValue);
 	}
 
 	@Override
+	public long getLong(ConfigOption<Long> configOption) {
+		return  this.backingConfig.getLong(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void setLong(String key, long value) {
 		this.backingConfig.setLong(this.prefix + key, value);
 	}
 
 	@Override
+	public void setLong(ConfigOption<Long> key, long value) {
+		this.backingConfig.setLong(prefix + key.key(), value);
+	}
+
+	@Override
 	public boolean getBoolean(String key, boolean defaultValue) {
 		return this.backingConfig.getBoolean(this.prefix + key, defaultValue);
 	}
 
 	@Override
+	public boolean getBoolean(ConfigOption<Boolean> configOption) {
+		return  this.backingConfig.getBoolean(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void setBoolean(String key, boolean value) {
 		this.backingConfig.setBoolean(this.prefix + key, value);
 	}
 
 	@Override
+	public void setBoolean(ConfigOption<Boolean> key, boolean value) {
+		this.backingConfig.setBoolean(prefix + key.key(), value);
+	}
+
+	@Override
 	public float getFloat(String key, float defaultValue) {
 		return this.backingConfig.getFloat(this.prefix + key, defaultValue);
 	}
 
 	@Override
+	public float getFloat(ConfigOption<Float> configOption) {
+		return this.backingConfig.getFloat(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void setFloat(String key, float value) {
 		this.backingConfig.setFloat(this.prefix + key, value);
 	}
 
 	@Override
+	public void setFloat(ConfigOption<Float> key, float value) {
+		this.backingConfig.setFloat(prefix + key.key(), value);
+	}
+
+	@Override
 	public double getDouble(String key, double defaultValue) {
 		return this.backingConfig.getDouble(this.prefix + key, defaultValue);
 	}
 
 	@Override
+	public double getDouble(ConfigOption<Double> configOption) {
+		return this.backingConfig.getDouble(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void setDouble(String key, double value) {
 		this.backingConfig.setDouble(this.prefix + key, value);
 	}
 
 	@Override
+	public void setDouble(ConfigOption<Double> key, double value) {
+		this.backingConfig.setDouble(prefix + key.key(), value);
+	}
+
+	@Override
 	public byte[] getBytes(final String key, final byte[] defaultValue) {
 		return this.backingConfig.getBytes(this.prefix + key, defaultValue);
 	}
@@ -144,6 +213,11 @@ public final class DelegatingConfiguration extends Configuration {
 	}
 
 	@Override
+	public String getValue(ConfigOption<?> configOption) {
+		return this.backingConfig.getValue(prefixOption(configOption, prefix));
+	}
+
+	@Override
 	public void addAllToProperties(Properties props) {
 		// only add keys with our prefix
 		synchronized (backingConfig.confData) {
@@ -195,6 +269,27 @@ public final class DelegatingConfiguration extends Configuration {
 		return set;
 	}
 
+	@Override
+	public Configuration clone() {
+		return new DelegatingConfiguration(backingConfig.clone(), prefix);
+	}
+
+	@Override
+	public Map<String, String> toMap() {
+		Map<String, String> map = backingConfig.toMap();
+		Map<String, String> prefixed = new HashMap<>(map.size());
+		for (Map.Entry<String, String> entry : map.entrySet()) {
+			prefixed.put(prefix + entry.getKey(), entry.getValue());
+		}
+
+		return prefixed; 
+	}
+
+	@Override
+	public boolean containsKey(String key) {
+		return backingConfig.containsKey(prefix + key);
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override
@@ -225,4 +320,23 @@ public final class DelegatingConfiguration extends Configuration {
 			return false;
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static <T> ConfigOption<T> prefixOption(ConfigOption<T> option, String prefix) {
+		String key = prefix + option.key();
+
+		List<String> deprecatedKeys;
+		if (option.hasDeprecatedKeys()) {
+			deprecatedKeys = new ArrayList<>();
+			for (String dk : option.deprecatedKeys()) {
+				deprecatedKeys.add(prefix + dk);
+			}
+		} else {
+			deprecatedKeys = Collections.emptyList();
+		}
+
+		String[] deprecated = deprecatedKeys.toArray(new String[deprecatedKeys.size()]);
+		return new ConfigOption<T>(key, option.defaultValue(), deprecated);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index cf3c908..91c5f65 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.configuration;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,7 +34,7 @@ import org.junit.Test;
  * objects is tested.
  */
 public class ConfigurationTest extends TestLogger {
-	
+
 	private static final byte[] EMPTY_BYTES = new byte[0];
 	private static final long TOO_LONG = Integer.MAX_VALUE + 10L;
 	private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE;
@@ -73,7 +74,7 @@ public class ConfigurationTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testConversions() {
 		try {
@@ -175,7 +176,7 @@ public class ConfigurationTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testCopyConstructor() {
 		try {
@@ -194,4 +195,92 @@ public class ConfigurationTest extends TestLogger {
 			fail(e.getMessage());
 		}
 	}
+
+	@Test
+	public void testOptionWithDefault() {
+		Configuration cfg = new Configuration();
+		cfg.setInteger("int-key", 11);
+		cfg.setString("string-key", "abc");
+
+		ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").defaultValue("my-beautiful-default");
+		ConfigOption<Integer> presentIntOption = ConfigOptions.key("int-key").defaultValue(87);
+
+		assertEquals("abc", cfg.getString(presentStringOption));
+		assertEquals("abc", cfg.getValue(presentStringOption));
+
+		assertEquals(11, cfg.getInteger(presentIntOption));
+		assertEquals("11", cfg.getValue(presentIntOption));
+
+		// test getting default when no value is present
+
+		ConfigOption<String> stringOption = ConfigOptions.key("test").defaultValue("my-beautiful-default");
+		ConfigOption<Integer> intOption = ConfigOptions.key("test2").defaultValue(87);
+
+		// getting strings with default value should work
+		assertEquals("my-beautiful-default", cfg.getValue(stringOption));
+		assertEquals("my-beautiful-default", cfg.getString(stringOption));
+
+		// overriding the default should work
+		assertEquals("override", cfg.getString(stringOption, "override"));
+
+		// getting a primitive with a default value should work
+		assertEquals(87, cfg.getInteger(intOption));
+		assertEquals("87", cfg.getValue(intOption));
+	}
+
+	@Test
+	public void testOptionWithNoDefault() {
+		Configuration cfg = new Configuration();
+		cfg.setInteger("int-key", 11);
+		cfg.setString("string-key", "abc");
+
+		ConfigOption<String> presentStringOption = ConfigOptions.key("string-key").noDefaultValue();
+
+		assertEquals("abc", cfg.getString(presentStringOption));
+		assertEquals("abc", cfg.getValue(presentStringOption));
+
+		// test getting default when no value is present
+
+		ConfigOption<String> stringOption = ConfigOptions.key("test").noDefaultValue();
+
+		// getting strings for null should work
+		assertNull(cfg.getValue(stringOption));
+		assertNull(cfg.getString(stringOption));
+
+		// overriding the null default should work
+		assertEquals("override", cfg.getString(stringOption, "override"));
+	}
+
+	@Test
+	public void testDeprecatedKeys() {
+		Configuration cfg = new Configuration();
+		cfg.setInteger("the-key", 11);
+		cfg.setInteger("old-key", 12);
+		cfg.setInteger("older-key", 13);
+
+		ConfigOption<Integer> matchesFirst = ConfigOptions
+				.key("the-key")
+				.defaultValue(-1)
+				.withDeprecatedKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesSecond = ConfigOptions
+				.key("does-not-exist")
+				.defaultValue(-1)
+				.withDeprecatedKeys("old-key", "older-key");
+
+		ConfigOption<Integer> matchesThird = ConfigOptions
+				.key("does-not-exist")
+				.defaultValue(-1)
+				.withDeprecatedKeys("foo", "older-key");
+
+		ConfigOption<Integer> notContained = ConfigOptions
+				.key("does-not-exist")
+				.defaultValue(-1)
+				.withDeprecatedKeys("not-there", "also-not-there");
+
+		assertEquals(11, cfg.getInteger(matchesFirst));
+		assertEquals(12, cfg.getInteger(matchesSecond));
+		assertEquals(13, cfg.getInteger(matchesThird));
+		assertEquals(-1, cfg.getInteger(notContained));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
index d8b782d..9298a14 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java
@@ -24,8 +24,6 @@ import org.junit.Test;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Set;
 
 import static org.junit.Assert.assertTrue;
@@ -34,60 +32,43 @@ import static org.junit.Assert.assertEquals;
 
 public class DelegatingConfigurationTest {
 
-	/**
-	 * http://stackoverflow.com/questions/22225663/checking-in-a-unit-test-whether-all-methods-are-delegated
-	 */
 	@Test
 	public void testIfDelegatesImplementAllMethods() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
 
-		Comparator<Method> methodComparator = new Comparator<Method>() {
-			@Override
-			public int compare(Method o1, Method o2) {
-				String o1Str = o1.getName() + typeParamToString(o1.getParameterTypes());
-				String o2Str = o2.getName() + typeParamToString(o2.getParameterTypes());
-				return o1Str.compareTo( o2Str ); 
-			}
-
-			private String typeParamToString(Class<?>[] classes) {
-				String str = "";
-				for(Object t : classes) {
-					str += t.toString();
-				}
-				return str;
-			}
-		};
-		
 		// For each method in the Configuration class...
 		Method[] confMethods = Configuration.class.getDeclaredMethods();
 		Method[] delegateMethods = DelegatingConfiguration.class.getDeclaredMethods();
-		Arrays.sort(confMethods, methodComparator);
-		Arrays.sort(delegateMethods, methodComparator);
-		match : for (Method configurationMethod : confMethods) {
-			boolean hasMethod = false;
-			if(!Modifier.isPublic(configurationMethod.getModifiers()) ) {
+
+		for (Method configurationMethod : confMethods) {
+			if (!Modifier.isPublic(configurationMethod.getModifiers()) ) {
 				continue;
 			}
+
+			boolean hasMethod = false;
+
 			// Find matching method in wrapper class and call it
-			mismatch: for (Method wrapperMethod : delegateMethods) {
+			lookForWrapper: for (Method wrapperMethod : delegateMethods) {
 				if (configurationMethod.getName().equals(wrapperMethod.getName())) {
-					
+
 					// Get parameters for method
 					Class<?>[] wrapperMethodParams = wrapperMethod.getParameterTypes();
 					Class<?>[] configMethodParams = configurationMethod.getParameterTypes();
-					if(wrapperMethodParams.length != configMethodParams.length) {
-						System.err.println("Length");
-						break mismatch;
+					if (wrapperMethodParams.length != configMethodParams.length) {
+						continue;
 					}
-					for(int i = 0; i < wrapperMethodParams.length; i++) {
-						if(wrapperMethodParams[i] != configMethodParams[i]) {
-							break mismatch;
+
+					for (int i = 0; i < wrapperMethodParams.length; i++) {
+						if (wrapperMethodParams[i] != configMethodParams[i]) {
+							continue lookForWrapper;
 						}
 					}
 					hasMethod = true;
-					break match;
+					break;
 				}
 			}
-			assertTrue("Foo method '" + configurationMethod.getName() + "' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
+
+			assertTrue("Configuration method '" + configurationMethod.getName() + 
+					"' has not been wrapped correctly in DelegatingConfiguration wrapper", hasMethod);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d71a09cc/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
index 386d03b..26e3d7a 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/UnmodifiableConfigurationTest.java
@@ -56,6 +56,9 @@ public class UnmodifiableConfigurationTest extends TestLogger {
 	@Test
 	public void testExceptionOnSet() {
 		try {
+			@SuppressWarnings("rawtypes")
+			final ConfigOption rawOption = ConfigOptions.key("testkey").defaultValue("value");
+
 			Map<Class<?>, Object> parameters = new HashMap<Class<?>, Object>();
 			parameters.put(byte[].class, new byte[0]);
 			parameters.put(Class.class, Object.class);
@@ -65,19 +68,22 @@ public class UnmodifiableConfigurationTest extends TestLogger {
 			parameters.put(double.class, 0.0);
 			parameters.put(String.class, "");
 			parameters.put(boolean.class, false);
-					
+
 			Class<UnmodifiableConfiguration> clazz = UnmodifiableConfiguration.class;
 			UnmodifiableConfiguration config = new UnmodifiableConfiguration(new Configuration());
-			
+
 			for (Method m : clazz.getMethods()) {
 				if (m.getName().startsWith("set")) {
-					
+
+					Class<?> keyClass = m.getParameterTypes()[0];
 					Class<?> parameterClass = m.getParameterTypes()[1];
+					Object key = keyClass == String.class ? "key" : rawOption;
+
 					Object parameter = parameters.get(parameterClass);
 					assertNotNull("method " + m + " not covered by test", parameter);
-					
+
 					try {
-						m.invoke(config, "key", parameter);
+						m.invoke(config, key, parameter);
 						fail("should fail with an exception");
 					}
 					catch (InvocationTargetException e) {