You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/11/23 15:03:22 UTC
flink git commit: [FLINK-2017] Add predefined required parameters to
ParameterTool
Repository: flink
Updated Branches:
refs/heads/master cd7963cbe -> 601e8c607
[FLINK-2017] Add predefined required parameters to ParameterTool
- Add RequiredParameters class to handle required parameters in UDFs which can be checked
and validated against the parameters extracted in ParameterTool
- A required parameter is represented by an Option which has by default only a name and can
be extended to include possible values, a type, a default value
- Any validation failure will throw a RequiredParametersException
This closes #1097
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/601e8c60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/601e8c60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/601e8c60
Branch: refs/heads/master
Commit: 601e8c607d974ef96e45d1dc3d2be367f5ddfb32
Parents: cd7963c
Author: Martin Liesenberg <ma...@retresco.de>
Authored: Sun Sep 6 22:18:13 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Nov 23 15:02:41 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/api/java/utils/Option.java | 202 +++++++++++++
.../apache/flink/api/java/utils/OptionType.java | 34 +++
.../api/java/utils/RequiredParameters.java | 263 +++++++++++++++++
.../java/utils/RequiredParametersException.java | 58 ++++
.../flink/api/java/utils/OptionsTest.java | 104 +++++++
.../api/java/utils/RequiredParametersTest.java | 293 +++++++++++++++++++
6 files changed, 954 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
new file mode 100644
index 0000000..5b3cfc7
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined function.
+ */
+public class Option {
+
+ private String longName;
+ private String shortName;
+
+ private String defaultValue;
+ private Set<String> choices;
+
+ private String helpText;
+ private OptionType type = OptionType.STRING;
+
+ public Option(String name) {
+ this.longName = name;
+ this.choices = new HashSet<>();
+ }
+
+ /**
+ * Define an alternative / short name of the parameter.
+ * Only one alternative per parameter is allowed.
+ *
+ * @param shortName - short version of the parameter name
+ * @return the updated Option
+ */
+ public Option alt(String shortName) {
+ this.shortName = shortName;
+ return this;
+ }
+
+
+ /**
+ * Define the type of the Option.
+ *
+ * @param type - the type which the the value of the Option can be casted to.
+ * @return the updated Option
+ */
+ public Option type(OptionType type) {
+ this.type = type;
+ return this;
+ }
+
+ /**
+ * Define a default value for the option.
+ *
+ * Throws an exception if the list of possible values for the parameter is not empty and the default value passed
+ * is not in the list.
+ *
+ * @param defaultValue - the default value
+ * @return the updated Option
+ */
+ public Option defaultValue(String defaultValue) throws RequiredParametersException {
+ if (this.choices.isEmpty()) {
+ return this.setDefaultValue(defaultValue);
+ } else {
+ if (this.choices.contains(defaultValue)) {
+ return this.setDefaultValue(defaultValue);
+ } else {
+ throw new RequiredParametersException("Default value " + defaultValue +
+ " is not in the list of valid values for option " + this.longName);
+ }
+ }
+ }
+
+ /**
+ * Restrict the list of possible values of the parameter.
+ *
+ * @param choices - the allowed values of the parameter.
+ * @return the updated Option
+ */
+ public Option choices(String... choices) throws RequiredParametersException {
+ if (this.defaultValue != null) {
+ if (Arrays.asList(choices).contains(defaultValue)) {
+ Collections.addAll(this.choices, choices);
+ } else {
+ throw new RequiredParametersException("Valid values for option " + this.longName +
+ " do not contain defined default value " + defaultValue);
+ }
+ } else {
+ Collections.addAll(this.choices, choices);
+ }
+ return this;
+ }
+
+ /**
+ * Add a help text, explaining the parameter.
+ *
+ * @param helpText - the help text.
+ * @return the updated Option
+ */
+ public Option help(String helpText) {
+ this.helpText = helpText;
+ return this;
+ }
+
+ public String getName() {
+ return this.longName;
+ }
+
+ public boolean hasAlt() {
+ return this.shortName != null;
+ }
+
+ public boolean hasType() {
+ return this.type != null;
+ }
+
+ public OptionType getType() {
+ return this.type;
+ }
+
+ public String getAlt() {
+ return this.shortName;
+ }
+
+ public String getHelpText() {
+ return this.helpText;
+ }
+
+ public Set<String> getChoices() {
+ return this.choices;
+ }
+
+ public boolean hasDefaultValue() {
+ return this.defaultValue != null;
+ }
+
+ public String getDefaultValue() {
+ return this.defaultValue;
+ }
+
+ private Option setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+
+ public boolean isCastableToDefinedType(String value) {
+ switch (this.type) {
+ case INTEGER:
+ try {
+ Integer.parseInt(value);
+ } catch (NumberFormatException nfe) {
+ return false;
+ }
+ return true;
+ case LONG:
+ try {
+ Long.parseLong(value);
+ } catch (NumberFormatException nfe) {
+ return false;
+ }
+ return true;
+ case FLOAT:
+ try {
+ Float.parseFloat(value);
+ } catch (NumberFormatException nfe) {
+ return false;
+ }
+ return true;
+ case DOUBLE:
+ try {
+ Double.parseDouble(value);
+ } catch (NumberFormatException nfe) {
+ return false;
+ }
+ return true;
+ case BOOLEAN:
+ return Objects.equals(value, "true") || Objects.equals(value, "false");
+ case STRING:
+ return true;
+ }
+ throw new IllegalStateException("Invalid value for OptionType " + this.type + " for option " + this.longName);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
new file mode 100644
index 0000000..744da21
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/OptionType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.utils;
+
+/**
+ * Types the parameters of managed with {@link RequiredParameters} can take.
+ *
+ * Name maps directly to the corresponding Java type.
+ */
+public enum OptionType {
+ INTEGER,
+ LONG,
+ DOUBLE,
+ FLOAT,
+ BOOLEAN,
+ STRING
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
new file mode 100644
index 0000000..5cdac4c
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameters {
+
+ private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+ private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+ private static final int HELP_TEXT_LENGTH_PER_PARAM = 100;
+
+ private HashMap<String, Option> data;
+
+ public RequiredParameters() {
+ this.data = new HashMap<>();
+ }
+
+ /**
+ * Add a parameter based on its name.
+ *
+ * @param name - the name of the parameter
+ * @return - an {@link Option} object representing the parameter
+ * @throws RequiredParametersException if an option with the same name is already defined
+ */
+ public Option add(String name) throws RequiredParametersException {
+ if (!this.data.containsKey(name)) {
+ Option option = new Option(name);
+ this.data.put(name, option);
+ return option;
+ } else {
+ throw new RequiredParametersException("Option with key " + name + " already exists.");
+ }
+ }
+
+ /**
+ * Add a parameter encapsulated in an {@link Option} object.
+ *
+ * @param option - the parameter
+ * @throws RequiredParametersException if an option with the same name is already defined
+ */
+ public void add(Option option) throws RequiredParametersException {
+ if (!this.data.containsKey(option.getName())) {
+ this.data.put(option.getName(), option);
+ } else {
+ throw new RequiredParametersException("Option with key " + option.getName() + " already exists.");
+ }
+ }
+
+ /**
+ * Check for all required parameters defined:
+ * - has a value been passed
+ * - if not, does the parameter have an associated default value
+ * - does the type of the parameter match the one defined in RequiredParameters
+ * - does the value provided in the parameterTool adhere to the choices defined in the option
+ *
+ * If any check fails, a RequiredParametersException is thrown
+ *
+ * @param parameterTool - parameters supplied by the user.
+ * @throws RequiredParametersException if any of the specified checks fail
+ */
+ public void applyTo(ParameterTool parameterTool) throws RequiredParametersException {
+ List<String> missingArguments = new LinkedList<>();
+ for (Option o : data.values()) {
+ if (parameterTool.data.containsKey(o.getName())) {
+ if (Objects.equals(parameterTool.data.get(o.getName()), ParameterTool.NO_VALUE_KEY)) {
+ // the parameter has been passed, but no value, check if there is a default value
+ checkAndApplyDefaultValue(o, parameterTool.data);
+ } else {
+ // a value has been passed in the parameterTool, now check if it adheres to all constraints
+ checkAmbiguousValues(o, parameterTool.data);
+ checkIsCastableToDefinedType(o, parameterTool.data);
+ checkChoices(o, parameterTool.data);
+ }
+ } else {
+ // check if there is a default name or a value passed for a possibly defined alternative name.
+ if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
+ missingArguments.add(o.getName());
+ }
+ }
+ }
+ if (!missingArguments.isEmpty()) {
+ throw new RequiredParametersException(this.missingArgumentsText(missingArguments), missingArguments);
+ }
+ }
+
+ // check if the given parameter has a default value and add it to the passed map if that is the case
+ // else throw an exception
+ private void checkAndApplyDefaultValue(Option o, Map<String, String> data) throws RequiredParametersException {
+ if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, data)) {
+ throw new RequiredParametersException("No default value for undefined parameter " + o.getName());
+ }
+ }
+
+ // check if the value in the given map which corresponds to the name of the given option
+ // is castable to the type of the option (if any is defined)
+ private void checkIsCastableToDefinedType(Option o, Map<String, String> data) throws RequiredParametersException {
+ if (o.hasType() && !o.isCastableToDefinedType(data.get(o.getName()))) {
+ throw new RequiredParametersException("Value for parameter " + o.getName() +
+ " cannot be cast to type " + o.getType());
+ }
+ }
+
+ // check if the value in the given map which corresponds to the name of the given option
+ // adheres to the list of given choices for the param in the options (if any are defined)
+ private void checkChoices(Option o, Map<String, String> data) throws RequiredParametersException {
+ if (o.getChoices().size() > 0 && !o.getChoices().contains(data.get(o.getName()))) {
+ throw new RequiredParametersException("Value " + data.get(o.getName()) +
+ " is not in the list of valid choices for key " + o.getName());
+ }
+ }
+
+ // move value passed on alternative name to standard name or apply default value if any defined
+ // else return true to indicate parameter is 'really' missing
+ private boolean hasNoDefaultValueAndNoValuePassedOnAlternativeName(Option o, Map<String, String> data)
+ throws RequiredParametersException {
+ if (o.hasAlt() && data.containsKey(o.getAlt())) {
+ data.put(o.getName(), data.get(o.getAlt()));
+ } else {
+ if (o.hasDefaultValue()) {
+ data.put(o.getName(), o.getDefaultValue());
+ if (o.hasAlt()) {
+ data.put(o.getAlt(), o.getDefaultValue());
+ }
+ } else {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // given that the map contains a value for the name of the option passed
+ // check if it also contains a value for the shortName in option (if any is defined)
+ private void checkAmbiguousValues(Option o, Map<String, String> data) throws RequiredParametersException{
+ if (data.containsKey(o.getAlt()) && !Objects.equals(data.get(o.getAlt()), ParameterTool.NO_VALUE_KEY)) {
+ throw new RequiredParametersException("Value passed for parameter " + o.getName() +
+ " is ambiguous. Value passed for short and long name.");
+ }
+ }
+
+ /**
+ * Build a help text for the defined parameters.
+ *
+ * The format of the help text will be:
+ * Required Parameters:
+ * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices: \n
+ *
+ * @return a formatted help String.
+ */
+ public String getHelp() {
+ StringBuilder sb = new StringBuilder(data.size() * HELP_TEXT_LENGTH_PER_PARAM);
+
+ sb.append("Required Parameters:");
+ sb.append(HELP_TEXT_LINE_DELIMITER);
+
+ for (Option o : data.values()) {
+ sb.append(this.helpText(o));
+ }
+ sb.append(HELP_TEXT_LINE_DELIMITER);
+
+ return sb.toString();
+ }
+
+ /**
+ * Build a help text for the defined parameters and list the missing arguments at the end of the text.
+ *
+ * The format of the help text will be:
+ * Required Parameters:
+ * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices: \n
+ *
+ * Missing parameters:
+ * \t param1 param2 ... paramN
+ *
+ * @param missingArguments - a list of missing parameters
+ * @return a formatted help String.
+ */
+ public String getHelp(List<String> missingArguments) {
+ return this.getHelp() + this.missingArgumentsText(missingArguments);
+ }
+
+ /**
+ * for the given option create a line for the help text which looks like:
+ * \t -:shortName:, --:name: \t :helpText: \t default: :defaultValue: \t choices: :choices:
+ */
+ private String helpText(Option option) {
+ StringBuilder sb = new StringBuilder(HELP_TEXT_LENGTH_PER_PARAM);
+ sb.append(HELP_TEXT_PARAM_DELIMITER);
+
+ // if there is a short name, add it.
+ if (option.hasAlt()) {
+ sb.append("-");
+ sb.append(option.getAlt());
+ sb.append(", ");
+ }
+
+ // add the name
+ sb.append("--");
+ sb.append(option.getName());
+ sb.append(HELP_TEXT_PARAM_DELIMITER);
+
+ // if there is a help text, add it
+ if (option.getHelpText() != null) {
+ sb.append(option.getHelpText());
+ sb.append(HELP_TEXT_PARAM_DELIMITER);
+ }
+
+ // if there is a default value, add it.
+ if (option.hasDefaultValue()) {
+ sb.append("default: ");
+ sb.append(option.getDefaultValue());
+ sb.append(HELP_TEXT_PARAM_DELIMITER);
+ }
+
+ // if there is a list of choices add it.
+ if (!option.getChoices().isEmpty()) {
+ sb.append("choices: ");
+ for (String choice : option.getChoices()) {
+ sb.append(choice);
+ sb.append(" ");
+ }
+ }
+ sb.append(HELP_TEXT_LINE_DELIMITER);
+
+ return sb.toString();
+ }
+
+ private String missingArgumentsText(List<String> missingArguments) {
+ StringBuilder sb = new StringBuilder(missingArguments.size() * 10);
+
+ sb.append("Missing arguments for:");
+ sb.append(HELP_TEXT_LINE_DELIMITER);
+
+ for (String arg : missingArguments) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
new file mode 100644
index 0000000..941bf25
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParametersException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.utils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Exception which is thrown if validation of {@link RequiredParameters} fails.
+ */
+public class RequiredParametersException extends Exception {
+
+ private List<String> missingArguments;
+
+ public RequiredParametersException() {
+ super();
+ }
+
+ public RequiredParametersException(String message, List<String> missingArguments) {
+ super(message);
+ this.missingArguments = missingArguments;
+ }
+
+ public RequiredParametersException(String message) {
+ super(message);
+ }
+
+ public RequiredParametersException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RequiredParametersException(Throwable cause) {
+ super(cause);
+ }
+
+ public List<String> getMissingArguments() {
+ if (missingArguments == null) {
+ return new LinkedList<>();
+ } else {
+ return this.missingArguments;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
new file mode 100644
index 0000000..d4f0bac
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/OptionsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the Options utility class.
+ */
+public class OptionsTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testChoicesWithInvalidDefaultValue() throws RequiredParametersException {
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Default value d is not in the list of valid values for option choices");
+
+ Option option = new Option("choices").choices("a", "b", "c");
+ option.defaultValue("d");
+ }
+
+ @Test
+ public void testChoicesWithValidDefaultValue() {
+ Option option = null;
+ try {
+ option = new Option("choices").choices("a", "b", "c");
+ option = option.defaultValue("a");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown: " + e.getMessage());
+ }
+
+ Assert.assertEquals(option.getDefaultValue(), "a");
+ }
+
+ @Test
+ public void testChoicesWithInvalidDefautlValue() throws RequiredParametersException {
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Valid values for option choices do not contain defined default value x");
+
+ Option option = new Option("choices").defaultValue("x");
+ option.choices("a", "b");
+ }
+
+ @Test
+ public void testIsCastableToDefinedTypeWithDefaultType() {
+ Option option = new Option("name");
+ Assert.assertTrue(option.isCastableToDefinedType("some value"));
+ }
+
+ @Test
+ public void testIsCastableToDefinedTypeWithMatchingTypes() {
+ // Integer
+ Option option = new Option("name").type(OptionType.INTEGER);
+ Assert.assertTrue(option.isCastableToDefinedType("15"));
+
+ // Double
+ Option optionDouble = new Option("name").type(OptionType.DOUBLE);
+ Assert.assertTrue(optionDouble.isCastableToDefinedType("15.0"));
+
+ // Boolean
+ Option optionFloat = new Option("name").type(OptionType.BOOLEAN);
+ Assert.assertTrue(optionFloat.isCastableToDefinedType("true"));
+
+ }
+
+ @Test
+ public void testIsCastableToDefinedTypeWithNonMatchingTypes() {
+ // Integer
+ Option option = new Option("name").type(OptionType.INTEGER);
+ Assert.assertFalse(option.isCastableToDefinedType("true"));
+
+ // Double
+ Option optionDouble = new Option("name").type(OptionType.DOUBLE);
+ Assert.assertFalse(optionDouble.isCastableToDefinedType("name"));
+
+ // Boolean
+ Option optionFloat = new Option("name").type(OptionType.BOOLEAN);
+ Assert.assertFalse(optionFloat.isCastableToDefinedType("15"));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/601e8c60/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
new file mode 100644
index 0000000..72d95df
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.fail;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+/**
+ * Tests for RequiredParameter class and its interactions with ParameterTool
+ */
+public class RequiredParametersTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testAddWithAlreadyExistingParameter() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Option with key berlin already exists.");
+
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("berlin"));
+ required.add(new Option("berlin"));
+ }
+
+ @Test
+ public void testStringBasedAddWithAlreadyExistingParameter() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Option with key berlin already exists.");
+
+ RequiredParameters required = new RequiredParameters();
+ required.add("berlin");
+ required.add("berlin");
+ }
+
+ @Test
+ public void testApplyToWithMissingParameters() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage(CoreMatchers.allOf(
+ containsString("Missing arguments for:"),
+ containsString("munich ")));
+
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{});
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("munich"));
+
+ required.applyTo(parameter);
+ }
+
+ @Test
+ public void testApplyToWithMissingDefaultValues() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("No default value for undefined parameter berlin");
+
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("berlin"));
+
+ required.applyTo(parameter);
+ }
+
+ @Test
+ public void testApplyToWithInvalidParameterValueBasedOnOptionChoices() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Value river is not in the list of valid choices for key berlin");
+
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "river"});
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("berlin").choices("city", "metropolis"));
+
+ required.applyTo(parameter);
+ }
+
+ @Test
+ public void testApplyToWithParameterDefinedOnShortAndLongName() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Value passed for parameter berlin is ambiguous. " +
+ "Value passed for short and long name.");
+
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "value", "--b", "another"});
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("berlin").alt("b"));
+
+ required.applyTo(parameter);
+ }
+
+ @Test
+ public void testApplyToMovesValuePassedOnShortNameToLongNameIfLongNameIsUndefined() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--b", "value"});
+ RequiredParameters required = new RequiredParameters();
+
+ try {
+ required.add(new Option("berlin").alt("b"));
+ required.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ Assert.assertEquals(parameter.data.get("b"), "value");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDefaultValueDoesNotOverrideValuePassedOnShortKeyIfLongKeyIsNotPassedButPresent() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "--b", "value"});
+ RequiredParameters required = new RequiredParameters();
+
+ try {
+ required.add(new Option("berlin").alt("b").defaultValue("something"));
+ required.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ Assert.assertEquals(parameter.data.get("b"), "value");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testApplyToWithNonCastableType() throws RequiredParametersException {
+
+ expectedException.expect(RequiredParametersException.class);
+ expectedException.expectMessage("Value for parameter flag cannot be cast to type BOOLEAN");
+
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--flag", "15"});
+ RequiredParameters required = new RequiredParameters();
+ required.add(new Option("flag").type(OptionType.BOOLEAN));
+
+ required.applyTo(parameter);
+ }
+
+ @Test
+ public void testApplyToWithSimpleOption() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin", "value"});
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add(new Option("berlin"));
+ required.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testApplyToWithOptionAndDefaultValue() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add(new Option("berlin").defaultValue("value"));
+ required.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testApplyToWithOptionWithLongAndShortNameAndDefaultValue() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--berlin"});
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add(new Option("berlin").alt("b").defaultValue("value"));
+ required.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ Assert.assertEquals(parameter.data.get("b"), "value");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testApplyToWithOptionMultipleOptionsAndOneDefaultValue() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--input", "abc"});
+ RequiredParameters rq = new RequiredParameters();
+ try {
+ rq.add("input");
+ rq.add(new Option("parallelism").alt("p").defaultValue("1").type(OptionType.INTEGER));
+ rq.applyTo(parameter);
+ Assert.assertEquals(parameter.data.get("parallelism"), "1");
+ Assert.assertEquals(parameter.data.get("p"), "1");
+ Assert.assertEquals(parameter.data.get("input"), "abc");
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testApplyToWithMultipleTypes() {
+ ParameterTool parameter = ParameterTool.fromArgs(new String[]{});
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add(new Option("berlin").defaultValue("value"));
+ required.add(new Option("count").defaultValue("15"));
+ required.add(new Option("someFlag").alt("sf").defaultValue("true"));
+
+ required.applyTo(parameter);
+
+ Assert.assertEquals(parameter.data.get("berlin"), "value");
+ Assert.assertEquals(parameter.data.get("count"), "15");
+ Assert.assertEquals(parameter.data.get("someFlag"), "true");
+ Assert.assertEquals(parameter.data.get("sf"), "true");
+
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrintHelpForFullySetOption() {
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add(new Option("option").defaultValue("some").help("help").alt("o").choices("some", "options"));
+
+ String helpText = required.getHelp();
+ Assert.assertThat(helpText, CoreMatchers.allOf(
+ containsString("Required Parameters:"),
+ containsString("-o, --option"),
+ containsString("default: some"),
+ containsString("choices: "),
+ containsString("some"),
+ containsString("options")));
+
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrintHelpForMultipleParams() {
+ RequiredParameters required = new RequiredParameters();
+ try {
+ required.add("input");
+ required.add("output");
+ required.add(new Option("parallelism").alt("p").help("Set the parallelism for all operators").type(OptionType.INTEGER));
+
+ String helpText = required.getHelp();
+ Assert.assertThat(helpText, CoreMatchers.allOf(
+ containsString("Required Parameters:"),
+ containsString("--input"),
+ containsString("--output"),
+ containsString("-p, --parallelism"),
+ containsString("Set the parallelism for all operators")));
+
+ Assert.assertThat(helpText, CoreMatchers.allOf(
+ not(containsString("choices")),
+ not(containsString("default"))));
+ } catch (RequiredParametersException e) {
+ fail("Exception thrown " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPrintHelpWithMissingParams() {
+ RequiredParameters required = new RequiredParameters();
+
+ String helpText = required.getHelp(Arrays.asList("param1", "param2", "paramN"));
+ Assert.assertThat(helpText, CoreMatchers.allOf(
+ containsString("Missing arguments for:"),
+ containsString("param1 "),
+ containsString("param2 "),
+ containsString("paramN ")));
+ }
+}