You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/23 15:03:22 UTC

flink git commit: [FLINK-2017] Add predefined required parameters to ParameterTool

Repository: flink
Updated Branches:
  refs/heads/master cd7963cbe -> 601e8c607


[FLINK-2017] Add predefined required parameters to ParameterTool

- Add RequiredParameters class to handle required parameters in UDFs which can be checked
  and validated against the parameters extracted in ParameterTool
- A required parameter is represented by an Option which has by default only a name and can
  be extended to include possible values, a type, a default value
- Any validation failure will throw a RequiredParametersException

This closes #1097


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/601e8c60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/601e8c60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/601e8c60

Branch: refs/heads/master
Commit: 601e8c607d974ef96e45d1dc3d2be367f5ddfb32
Parents: cd7963c
Author: Martin Liesenberg <ma...@retresco.de>
Authored: Sun Sep 6 22:18:13 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Nov 23 15:02:41 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/api/java/utils/Option.java | 202 +++++++++++++
 .../apache/flink/api/java/utils/OptionType.java |  34 +++
 .../api/java/utils/RequiredParameters.java      | 263 +++++++++++++++++
 .../java/utils/RequiredParametersException.java |  58 ++++
 .../flink/api/java/utils/OptionsTest.java       | 104 +++++++
 .../api/java/utils/RequiredParametersTest.java  | 293 +++++++++++++++++++
 6 files changed, 954 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
new file mode 100644
index 0000000..5b3cfc7
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined function.
+ */
+public class Option {
+
+	private String longName;
+	private String shortName;
+
+	private String defaultValue;
+	private Set<String> choices;
+
+	private String helpText;
+	private OptionType type = OptionType.STRING;
+
+	public Option(String name) {
+		this.longName = name;
+		this.choices = new HashSet<>();
+	}
+
+	/**
+	 * Define an alternative / short name of the parameter.
+	 * Only one alternative per parameter is allowed.
+	 *
+	 * @param shortName - short version of the parameter name
+	 * @return the updated Option
+	 */
+	public Option alt(String shortName) {
+		this.shortName = shortName;
+		return this;
+	}
+
+
+	/**
+	 * Define the type of the Option.
+	 *
+	 * @param type - the type which the the value of the Option can be casted to.
+	 * @return the updated Option
+	 */
+	public Option type(OptionType type) {
+		this.type = type;
+		return this;
+	}
+
+	/**
+	 * Define a default value for the option.
+	 *
+	 * Throws an exception if the list of possible values for the parameter is not empty and the default value passed
+	 * is not in the list.
+	 *
+	 * @param defaultValue - the default value
+	 * @return the updated Option
+	 */
+	public Option defaultValue(String defaultValue) throws RequiredParametersException {
+		if (this.choices.isEmpty()) {
+			return this.setDefaultValue(defaultValue);
+		} else {
+			if (this.choices.contains(defaultValue)) {
+				return this.setDefaultValue(defaultValue);
+			} else {
+				throw new RequiredParametersException("Default value " + defaultValue +
+						" is not in the list of valid values for option " + this.longName);
+			}
+		}
+	}
+
+	/**
+	 * Restrict the list of possible values of the parameter.
+	 *
+	 * @param choices - the allowed values of the parameter.
+	 * @return the updated Option
+	 */
+	public Option choices(String... choices) throws RequiredParametersException {
+		if (this.defaultValue != null) {
+			if (Arrays.asList(choices).contains(defaultValue)) {
+				Collections.addAll(this.choices, choices);
+			} else {
+				throw new RequiredParametersException("Valid values for option " + this.longName +
+						" do not contain defined default value " + defaultValue);
+			}
+		} else {
+			Collections.addAll(this.choices, choices);
+		}
+		return this;
+	}
+
+	/**
+	 * Add a help text, explaining the parameter.
+	 *
+	 * @param helpText - the help text.
+	 * @return the updated Option
+	 */
+	public Option help(String helpText) {
+		this.helpText = helpText;
+		return this;
+	}
+
+	public String getName() {
+		return this.longName;
+	}
+
+	public boolean hasAlt() {
+		return this.shortName != null;
+	}
+
+	public boolean hasType() {
+		return this.type != null;
+	}
+
+	public OptionType getType() {
+		return this.type;
+	}
+
+	public String getAlt() {
+		return this.shortName;
+	}
+
+	public String getHelpText() {
+		return this.helpText;
+	}
+
+	public Set<String> getChoices() {
+		return this.choices;
+	}
+
+	public boolean hasDefaultValue() {
+		return this.defaultValue != null;
+	}
+
+	public String getDefaultValue() {
+		return this.defaultValue;
+	}
+
+	private Option setDefaultValue(String defaultValue) {
+		this.defaultValue = defaultValue;
+		return this;
+	}
+
+	public boolean isCastableToDefinedType(String value) {
+		switch (this.type) {
+			case INTEGER:
+				try {
+					Integer.parseInt(value);
+				} catch (NumberFormatException nfe) {
+					return false;
+				}
+				return true;
+			case LONG:
+				try {
+					Long.parseLong(value);
+				} catch (NumberFormatException nfe) {
+					return false;
+				}
+				return true;
+			case FLOAT:
+				try {
+					Float.parseFloat(value);
+				} catch (NumberFormatException nfe) {
+					return false;
+				}
+				return true;
+			case DOUBLE:
+				try {
+					Double.parseDouble(value);
+				} catch (NumberFormatException nfe) {
+					return false;
+				}
+				return true;
+			case BOOLEAN:
+				return Objects.equals(value, "true") || Objects.equals(value, "false");
+			case STRING:
+				return true;
+		}
+		throw new IllegalStateException("Invalid value for OptionType " + this.type + " for option " + this.longName);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
new file mode 100644
index 0000000..744da21
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.utils;
+
+/**
+ * Types the parameters of managed with {@link RequiredParameters} can take.
+ *
+ * Name maps directly to the corresponding Java type.
+ */
+public enum OptionType {
+	INTEGER,
+	LONG,
+	DOUBLE,
+	FLOAT,
+	BOOLEAN,
+	STRING
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
new file mode 100644
index 0000000..5cdac4c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameters {
+
+	private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+	private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+	private static final int HELP_TEXT_LENGTH_PER_PARAM = 100;
+
+	private HashMap<String, Option> data;
+
+	public RequiredParameters() {
+		this.data = new HashMap<>();
+	}
+
+	/**
+	 * Add a parameter based on its name.
+	 *
+	 * @param name - the name of the parameter
+	 * @return - an {@link Option} object representing the parameter
+	 * @throws RequiredParametersException if an option with the same name is already defined
+	 */
+	public Option add(String name) throws RequiredParametersException {
+		if (!this.data.containsKey(name)) {
+			Option option = new Option(name);
+			this.data.put(name, option);
+			return option;
+		} else {
+			throw new RequiredParametersException("Option with key " + name + " already exists.");
+		}
+	}
+
+	/**
+	 * Add a parameter encapsulated in an {@link Option} object.
+	 *
+	 * @param option - the parameter
+	 * @throws RequiredParametersException if an option with the same name is already defined
+	 */
+	public void add(Option option) throws RequiredParametersException {
+		if (!this.data.containsKey(option.getName())) {
+			this.data.put(option.getName(), option);
+		} else {
+			throw new RequiredParametersException("Option with key " + option.getName() + " already exists.");
+		}
+	}
+
+	/**
+	 * Check for all required parameters defined:
+	 * - has a value been passed
+	 *   - if not, does the parameter have an associated default value
+	 * - does the type of the parameter match the one defined in RequiredParameters
+	 * - does the value provided in the parameterTool adhere to the choices defined in the option
+	 *
+	 * If any check fails, a RequiredParametersException is thrown
+	 *
+	 * @param parameterTool - parameters supplied by the user.
+	 * @throws RequiredParametersException if any of the specified checks fail
+	 */
+	public void applyTo(ParameterTool parameterTool) throws RequiredParametersException {
+		List<String> missingArguments = new LinkedList<>();
+		for (Option o : data.values()) {
+			if (parameterTool.data.containsKey(o.getName())) {
+				if (Objects.equals(parameterTool.data.get(o.getName()), ParameterTool.NO_VALUE_KEY)) {
+					// the parameter has been passed, but no value, check if there is a default value
+					checkAndApplyDefaultValue(o, parameterTool.data);
+				} else {
+					// a value has been passed in the parameterTool, now check if it adheres to all constraints
+					checkAmbiguousValues(o, parameterTool.data);
+					checkIsCastableToDefinedType(o, parameterTool.data);
+					checkChoices(o, parameterTool.data);
+				}
+			} else {
+				// check if there is a default name or a value passed for a possibly defined alternative name.
+				if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
+					missingArguments.add(o.getName());
+				}
+			}
+		}
+		if (!missingArguments.isEmpty()) {
+			throw new RequiredParametersException(this.missingArgumentsText(missingArguments), missingArguments);
+		}
+	}
+
+	// check if the given parameter has a default value and add it to the passed map if that is the case
+	// else throw an exception
+	private void checkAndApplyDefaultValue(Option o, Map<String, String> data) throws RequiredParametersException {
+		if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, data)) {
+			throw new RequiredParametersException("No default value for undefined parameter " + o.getName());
+		}
+	}
+
+	// check if the value in the given map which corresponds to the name of the given option
+	// is castable to the type of the option (if any is defined)
+	private void checkIsCastableToDefinedType(Option o, Map<String, String> data) throws RequiredParametersException {
+		if (o.hasType() && !o.isCastableToDefinedType(data.get(o.getName()))) {
+			throw new RequiredParametersException("Value for parameter " + o.getName() +
+					" cannot be cast to type " + o.getType());
+		}
+	}
+
+	// check if the value in the given map which corresponds to the name of the given option
+	// adheres to the list of given choices for the param in the options (if any are defined)
+	private void checkChoices(Option o, Map<String, String> data) throws RequiredParametersException {
+		if (o.getChoices().size() > 0 && !o.getChoices().contains(data.get(o.getName()))) {
+			throw new RequiredParametersException("Value " + data.get(o.getName()) +
+					" is not in the list of valid choices for key " + o.getName());
+		}
+	}
+
+	// move value passed on alternative name to standard name or apply default value if any defined
+	// else return true to indicate parameter is 'really' missing
+	private boolean hasNoDefaultValueAndNoValuePassedOnAlternativeName(Option o, Map<String, String> data)
+			throws RequiredParametersException {
+		if (o.hasAlt() && data.containsKey(o.getAlt())) {
+			data.put(o.getName(), data.get(o.getAlt()));
+		} else {
+			if (o.hasDefaultValue()) {
+				data.put(o.getName(), o.getDefaultValue());
+				if (o.hasAlt()) {
+					data.put(o.getAlt(), o.getDefaultValue());
+				}
+			} else {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	// given that the map contains a value for the name of the option passed
+	// check if it also contains a value for the shortName in option (if any is defined)
+	private void checkAmbiguousValues(Option o, Map<String, String> data) throws RequiredParametersException{
+		if (data.containsKey(o.getAlt()) && !Objects.equals(data.get(o.getAlt()), ParameterTool.NO_VALUE_KEY)) {
+			throw new RequiredParametersException("Value passed for parameter " + o.getName() +
+					" is ambiguous. Value passed for short and long name.");
+		}
+	}
+
+	/**
+	 * Build a help text for the defined parameters.
+	 *
+	 * The format of the help text will be:
+	 * Required Parameters:
+	 * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices: \n
+	 *
+	 * @return a formatted help String.
+	 */
+	public String getHelp() {
+		StringBuilder sb = new StringBuilder(data.size() * HELP_TEXT_LENGTH_PER_PARAM);
+
+		sb.append("Required Parameters:");
+		sb.append(HELP_TEXT_LINE_DELIMITER);
+
+		for (Option o : data.values()) {
+			sb.append(this.helpText(o));
+		}
+		sb.append(HELP_TEXT_LINE_DELIMITER);
+
+		return sb.toString();
+	}
+
+	/**
+	 * Build a help text for the defined parameters and list the missing arguments at the end of the text.
+	 *
+	 * The format of the help text will be:
+	 * Required Parameters:
+	 * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices: \n
+	 *
+	 * Missing parameters:
+	 * \t param1 param2 ... paramN
+	 *
+	 * @param missingArguments - a list of missing parameters
+	 * @return a formatted help String.
+	 */
+	public String getHelp(List<String> missingArguments) {
+		return this.getHelp() + this.missingArgumentsText(missingArguments);
+	}
+
+	/**
+	 * for the given option create a line for the help text which looks like:
+	 * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices:
+	 */
+	private String helpText(Option option) {
+		StringBuilder sb = new StringBuilder(HELP_TEXT_LENGTH_PER_PARAM);
+		sb.append(HELP_TEXT_PARAM_DELIMITER);
+
+		// if there is a short name, add it.
+		if (option.hasAlt()) {
+			sb.append("-");
+			sb.append(option.getAlt());
+			sb.append(", ");
+		}
+
+		// add the name
+		sb.append("--");
+		sb.append(option.getName());
+		sb.append(HELP_TEXT_PARAM_DELIMITER);
+
+		// if there is a help text, add it
+		if (option.getHelpText() != null) {
+			sb.append(option.getHelpText());
+			sb.append(HELP_TEXT_PARAM_DELIMITER);
+		}
+
+		// if there is a default value, add it.
+		if (option.hasDefaultValue()) {
+			sb.append("default: ");
+			sb.append(option.getDefaultValue());
+			sb.append(HELP_TEXT_PARAM_DELIMITER);
+		}
+
+		// if there is a list of choices add it.
+		if (!option.getChoices().isEmpty()) {
+			sb.append("choices: ");
+			for (String choice : option.getChoices()) {
+				sb.append(choice);
+				sb.append(" ");
+			}
+		}
+		sb.append(HELP_TEXT_LINE_DELIMITER);
+
+		return sb.toString();
+	}
+
+	private String missingArgumentsText(List<String> missingArguments) {
+		StringBuilder sb = new StringBuilder(missingArguments.size() * 10);
+
+		sb.append("Missing arguments for:");
+		sb.append(HELP_TEXT_LINE_DELIMITER);
+
+		for (String arg : missingArguments) {
+			sb.append(arg);
+			sb.append(" ");
+		}
+
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
new file mode 100644
index 0000000..941bf25
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Exception which is thrown if validation of {@link RequiredParameters} fails.
+ */
+public class RequiredParametersException extends Exception {
+
+	private List<String> missingArguments;
+
+	public RequiredParametersException() {
+		super();
+	}
+
+	public RequiredParametersException(String message, List<String> missingArguments) {
+		super(message);
+		this.missingArguments = missingArguments;
+	}
+
+	public RequiredParametersException(String message) {
+		super(message);
+	}
+
+	public RequiredParametersException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public RequiredParametersException(Throwable cause) {
+		super(cause);
+	}
+
+	public List<String> getMissingArguments() {
+		if (missingArguments == null) {
+			return new LinkedList<>();
+		} else {
+			return this.missingArguments;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
new file mode 100644
index 0000000..d4f0bac
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the Options utility class.
+ */
+public class OptionsTest {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testChoicesWithInvalidDefaultValue() throws RequiredParametersException {
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Default value d is not in the list of valid values for option choices");
+
+		Option option = new Option("choices").choices("a", "b", "c");
+		option.defaultValue("d");
+	}
+
+	@Test
+	public void testChoicesWithValidDefaultValue() {
+		Option option = null;
+		try {
+			option = new Option("choices").choices("a", "b", "c");
+			option = option.defaultValue("a");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown: " + e.getMessage());
+		}
+
+		Assert.assertEquals(option.getDefaultValue(), "a");
+	}
+
+	@Test
+	public void testChoicesWithInvalidDefautlValue() throws RequiredParametersException {
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Valid values for option choices do not contain defined default value x");
+
+		Option option = new Option("choices").defaultValue("x");
+		option.choices("a", "b");
+	}
+
+	@Test
+	public void testIsCastableToDefinedTypeWithDefaultType() {
+		Option option = new Option("name");
+		Assert.assertTrue(option.isCastableToDefinedType("some value"));
+	}
+
+	@Test
+	public void testIsCastableToDefinedTypeWithMatchingTypes() {
+		// Integer
+		Option option = new Option("name").type(OptionType.INTEGER);
+		Assert.assertTrue(option.isCastableToDefinedType("15"));
+
+		// Double
+		Option optionDouble = new Option("name").type(OptionType.DOUBLE);
+		Assert.assertTrue(optionDouble.isCastableToDefinedType("15.0"));
+
+		// Boolean
+		Option optionFloat = new Option("name").type(OptionType.BOOLEAN);
+		Assert.assertTrue(optionFloat.isCastableToDefinedType("true"));
+
+	}
+
+	@Test
+	public void testIsCastableToDefinedTypeWithNonMatchingTypes() {
+		// Integer
+		Option option = new Option("name").type(OptionType.INTEGER);
+		Assert.assertFalse(option.isCastableToDefinedType("true"));
+
+		// Double
+		Option optionDouble = new Option("name").type(OptionType.DOUBLE);
+		Assert.assertFalse(optionDouble.isCastableToDefinedType("name"));
+
+		// Boolean
+		Option optionFloat = new Option("name").type(OptionType.BOOLEAN);
+		Assert.assertFalse(optionFloat.isCastableToDefinedType("15"));
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
new file mode 100644
index 0000000..72d95df
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.fail;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+/**
+ * Tests for RequiredParameter class and its interactions with ParameterTool
+ */
+public class RequiredParametersTest {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testAddWithAlreadyExistingParameter() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Option with key berlin already exists.");
+
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("berlin"));
+		required.add(new Option("berlin"));
+	}
+
+	@Test
+	public void testStringBasedAddWithAlreadyExistingParameter() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Option with key berlin already exists.");
+
+		RequiredParameters required = new RequiredParameters();
+		required.add("berlin");
+		required.add("berlin");
+	}
+
+	@Test
+	public void testApplyToWithMissingParameters() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage(CoreMatchers.allOf(
+				containsString("Missing arguments for:"),
+				containsString("munich ")));
+
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{});
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("munich"));
+
+		required.applyTo(parameter);
+	}
+
+	@Test
+	public void testApplyToWithMissingDefaultValues() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("No default value for undefined parameter berlin");
+
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("berlin"));
+
+		required.applyTo(parameter);
+	}
+
+	@Test
+	public void testApplyToWithInvalidParameterValueBasedOnOptionChoices() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Value river is not in the list of valid choices for key berlin");
+
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "river"});
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("berlin").choices("city", "metropolis"));
+
+		required.applyTo(parameter);
+	}
+
+	@Test
+	public void testApplyToWithParameterDefinedOnShortAndLongName() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Value passed for parameter berlin is ambiguous. " +
+				"Value passed for short and long name.");
+
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "value", "--b", "another"});
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("berlin").alt("b"));
+
+		required.applyTo(parameter);
+	}
+
+	@Test
+	public void testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--b", "value"});
+		RequiredParameters required = new RequiredParameters();
+
+		try {
+			required.add(new Option("berlin").alt("b"));
+			required.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+			Assert.assertEquals(parameter.data.get("b"), "value");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "--b", "value"});
+		RequiredParameters required = new RequiredParameters();
+
+		try {
+			required.add(new Option("berlin").alt("b").defaultValue("something"));
+			required.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+			Assert.assertEquals(parameter.data.get("b"), "value");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testApplyToWithNonCastableType() throws RequiredParametersException {
+
+		expectedException.expect(RequiredParametersException.class);
+		expectedException.expectMessage("Value for parameter flag cannot be cast to type BOOLEAN");
+
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--flag", "15"});
+		RequiredParameters required = new RequiredParameters();
+		required.add(new Option("flag").type(OptionType.BOOLEAN));
+
+		required.applyTo(parameter);
+	}
+
+	@Test
+	public void testApplyToWithSimpleOption() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "value"});
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add(new Option("berlin"));
+			required.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testApplyToWithOptionAndDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add(new Option("berlin").defaultValue("value"));
+			required.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testApplyToWithOptionWithLongAndShortNameAndDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add(new Option("berlin").alt("b").defaultValue("value"));
+			required.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+			Assert.assertEquals(parameter.data.get("b"), "value");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testApplyToWithOptionMultipleOptionsAndOneDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--input", "abc"});
+		RequiredParameters rq = new RequiredParameters();
+		try {
+			rq.add("input");
+			rq.add(new Option("parallelism").alt("p").defaultValue("1").type(OptionType.INTEGER));
+			rq.applyTo(parameter);
+			Assert.assertEquals(parameter.data.get("parallelism"), "1");
+			Assert.assertEquals(parameter.data.get("p"), "1");
+			Assert.assertEquals(parameter.data.get("input"), "abc");
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testApplyToWithMultipleTypes() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{});
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add(new Option("berlin").defaultValue("value"));
+			required.add(new Option("count").defaultValue("15"));
+			required.add(new Option("someFlag").alt("sf").defaultValue("true"));
+
+			required.applyTo(parameter);
+
+			Assert.assertEquals(parameter.data.get("berlin"), "value");
+			Assert.assertEquals(parameter.data.get("count"), "15");
+			Assert.assertEquals(parameter.data.get("someFlag"), "true");
+			Assert.assertEquals(parameter.data.get("sf"), "true");
+
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPrintHelpForFullySetOption() {
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add(new Option("option").defaultValue("some").help("help").alt("o").choices("some", "options"));
+
+			String helpText = required.getHelp();
+			Assert.assertThat(helpText, CoreMatchers.allOf(
+					containsString("Required Parameters:"),
+					containsString("-o, --option"),
+					containsString("default: some"),
+					containsString("choices: "),
+					containsString("some"),
+					containsString("options")));
+
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPrintHelpForMultipleParams() {
+		RequiredParameters required = new RequiredParameters();
+		try {
+			required.add("input");
+			required.add("output");
+			required.add(new Option("parallelism").alt("p").help("Set the parallelism for all operators").type(OptionType.INTEGER));
+
+			String helpText = required.getHelp();
+			Assert.assertThat(helpText, CoreMatchers.allOf(
+					containsString("Required Parameters:"),
+					containsString("--input"),
+					containsString("--output"),
+					containsString("-p, --parallelism"),
+					containsString("Set the parallelism for all operators")));
+
+			Assert.assertThat(helpText, CoreMatchers.allOf(
+					not(containsString("choices")),
+					not(containsString("default"))));
+		} catch (RequiredParametersException e) {
+			fail("Exception thrown " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPrintHelpWithMissingParams() {
+		RequiredParameters required = new RequiredParameters();
+
+		String helpText = required.getHelp(Arrays.asList("param1", "param2", "paramN"));
+		Assert.assertThat(helpText, CoreMatchers.allOf(
+				containsString("Missing arguments for:"),
+				containsString("param1 "),
+				containsString("param2 "),
+				containsString("paramN ")));
+	}
+}