You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/11/20 14:50:35 UTC

[1/2] flink git commit: [hotfix][license] Add missing licenses

Repository: flink
Updated Branches:
  refs/heads/master ec863708d -> 0665428cd


[hotfix][license] Add missing licenses

This close #4794.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0665428c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0665428c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0665428c

Branch: refs/heads/master
Commit: 0665428cd6f2834d946486971673e7c85e5c776c
Parents: 90b3b4c
Author: yew1eb <ye...@gmail.com>
Authored: Wed Oct 11 02:52:35 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:50:18 2017 +0100

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/conf/zoo.cfg       | 18 ++++++++++++++++++
 .../assets/images/browserconfig.xml              | 19 +++++++++++++++++++
 .../web-dashboard/web/images/browserconfig.xml   | 19 +++++++++++++++++++
 pom.xml                                          |  2 --
 4 files changed, 56 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0665428c/flink-dist/src/main/flink-bin/conf/zoo.cfg
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/zoo.cfg b/flink-dist/src/main/flink-bin/conf/zoo.cfg
index a14ec66..f598997 100644
--- a/flink-dist/src/main/flink-bin/conf/zoo.cfg
+++ b/flink-dist/src/main/flink-bin/conf/zoo.cfg
@@ -1,3 +1,21 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
 # The number of milliseconds of each tick
 tickTime=2000
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0665428c/flink-runtime-web/web-dashboard/assets/images/browserconfig.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/assets/images/browserconfig.xml b/flink-runtime-web/web-dashboard/assets/images/browserconfig.xml
index 81ec113..68bf638 100644
--- a/flink-runtime-web/web-dashboard/assets/images/browserconfig.xml
+++ b/flink-runtime-web/web-dashboard/assets/images/browserconfig.xml
@@ -1,4 +1,23 @@
 <?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
 <browserconfig>
   <msapplication>
     <tile>

http://git-wip-us.apache.org/repos/asf/flink/blob/0665428c/flink-runtime-web/web-dashboard/web/images/browserconfig.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/web/images/browserconfig.xml b/flink-runtime-web/web-dashboard/web/images/browserconfig.xml
index 81ec113..68bf638 100644
--- a/flink-runtime-web/web-dashboard/web/images/browserconfig.xml
+++ b/flink-runtime-web/web-dashboard/web/images/browserconfig.xml
@@ -1,4 +1,23 @@
 <?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
 <browserconfig>
   <msapplication>
     <tile>

http://git-wip-us.apache.org/repos/asf/flink/blob/0665428c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4aa5031..b70389a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -996,7 +996,6 @@ under the License.
 						<exclude>flink-runtime-web/web-dashboard/assets/fonts/fontawesome*</exclude>
 
 						<!-- web dashboard non-binary image assets -->
-						<exclude>flink-runtime-web/web-dashboard/assets/images/browserconfig.xml</exclude>
 						<exclude>flink-runtime-web/web-dashboard/assets/images/manifest.json</exclude>
 						<exclude>flink-runtime-web/web-dashboard/assets/images/safari-pinned-tab.svg</exclude>
 
@@ -1031,7 +1030,6 @@ under the License.
 						<!-- Configuration Files. -->
 						<exclude>**/flink-bin/conf/slaves</exclude>
 						<exclude>**/flink-bin/conf/masters</exclude>
-						<exclude>**/flink-bin/conf/zoo.cfg</exclude>
 						<!-- Administrative files in the main trunk. -->
 						<exclude>**/README.md</exclude>
 						<exclude>.github/**</exclude>


[2/2] flink git commit: [FLINK-7943] Make ParameterTool thread safe

Posted by ch...@apache.org.
[FLINK-7943] Make ParameterTool thread safe

This commit changes the serialization of the ParameterTool such that only the
data map is contained. The defaultData and the unrequestedParameters maps are
not serialized because they are only used on the client side. Additionally, the
defaultData and unrequestedParameters map are being made thread safe by using
ConcurrentHashMaps.

This closes #4921.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90b3b4cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90b3b4cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90b3b4cf

Branch: refs/heads/master
Commit: 90b3b4cfd2f0769e23a768061023f912d1a0cf50
Parents: ec86370
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 30 14:15:20 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Mon Nov 20 15:50:18 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfigTest.java   |  3 +-
 .../flink/api/java/utils/ParameterTool.java     | 64 +++++++++++++--
 .../api/java/utils/RequiredParameters.java      | 22 +++--
 .../flink/api/java/utils/ParameterToolTest.java | 85 ++++++++++++++++++++
 .../api/java/utils/RequiredParametersTest.java  | 18 +++--
 5 files changed, 167 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90b3b4cf/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 7e98604..f9beb40 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -37,7 +38,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class ExecutionConfigTest {
+public class ExecutionConfigTest extends TestLogger {
 
 	@Test
 	public void testDoubleTypeRegistration() {

http://git-wip-us.apache.org/repos/asf/flink/blob/90b3b4cf/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 894b66d..e42a4b7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -32,6 +32,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -39,8 +40,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class provides simple utility methods for reading and parsing program arguments from different sources.
@@ -212,13 +215,38 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 
 	// ------------------ ParameterUtil  ------------------------
 	protected final Map<String, String> data;
-	protected final Map<String, String> defaultData;
-	protected final Set<String> unrequestedParameters;
+
+	// data which is only used on the client and does not need to be transmitted
+	protected transient Map<String, String> defaultData;
+	protected transient Set<String> unrequestedParameters;
 
 	private ParameterTool(Map<String, String> data) {
-		this.data = new HashMap<>(data);
-		this.defaultData = new HashMap<>();
-		this.unrequestedParameters = new HashSet<>(data.keySet());
+		this.data = Collections.unmodifiableMap(new HashMap<>(data));
+
+		this.defaultData = new ConcurrentHashMap<>(data.size());
+
+		this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
+
+		unrequestedParameters.addAll(data.keySet());
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		ParameterTool that = (ParameterTool) o;
+		return Objects.equals(data, that.data) &&
+			Objects.equals(defaultData, that.defaultData) &&
+			Objects.equals(unrequestedParameters, that.unrequestedParameters);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(data, defaultData, unrequestedParameters);
 	}
 
 	/**
@@ -560,9 +588,21 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	 * @return The Merged {@link ParameterTool}
 	 */
 	public ParameterTool mergeWith(ParameterTool other) {
-		ParameterTool ret = new ParameterTool(this.data);
-		ret.data.putAll(other.data);
-		ret.unrequestedParameters.addAll(other.unrequestedParameters);
+		Map<String, String> resultData = new HashMap<>(data.size() + other.data.size());
+		resultData.putAll(data);
+		resultData.putAll(other.data);
+
+		ParameterTool ret = new ParameterTool(resultData);
+
+		final HashSet<String> requestedParametersLeft = new HashSet<>(data.keySet());
+		requestedParametersLeft.removeAll(unrequestedParameters);
+
+		final HashSet<String> requestedParametersRight = new HashSet<>(other.data.keySet());
+		requestedParametersRight.removeAll(other.unrequestedParameters);
+
+		ret.unrequestedParameters.removeAll(requestedParametersLeft);
+		ret.unrequestedParameters.removeAll(requestedParametersRight);
+
 		return ret;
 	}
 
@@ -573,4 +613,12 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 		return data;
 	}
 
+	// ------------------------- Serialization ---------------------------------------------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+
+		defaultData = Collections.emptyMap();
+		unrequestedParameters = Collections.emptySet();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90b3b4cf/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
index 1d2e73a..676a472 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
@@ -83,24 +83,28 @@ public class RequiredParameters {
 	 * <p>If any check fails, a RequiredParametersException is thrown
 	 *
 	 * @param parameterTool - parameters supplied by the user.
+	 * @return the updated ParameterTool containing all the required parameters
 	 * @throws RequiredParametersException if any of the specified checks fail
 	 */
-	public void applyTo(ParameterTool parameterTool) throws RequiredParametersException {
+	public ParameterTool applyTo(ParameterTool parameterTool) throws RequiredParametersException {
 		List<String> missingArguments = new LinkedList<>();
+
+		HashMap<String, String> newParameters = new HashMap<>(parameterTool.toMap());
+
 		for (Option o : data.values()) {
-			if (parameterTool.data.containsKey(o.getName())) {
-				if (Objects.equals(parameterTool.data.get(o.getName()), ParameterTool.NO_VALUE_KEY)) {
+			if (newParameters.containsKey(o.getName())) {
+				if (Objects.equals(newParameters.get(o.getName()), ParameterTool.NO_VALUE_KEY)) {
 					// the parameter has been passed, but no value, check if there is a default value
-					checkAndApplyDefaultValue(o, parameterTool.data);
+					checkAndApplyDefaultValue(o, newParameters);
 				} else {
 					// a value has been passed in the parameterTool, now check if it adheres to all constraints
-					checkAmbiguousValues(o, parameterTool.data);
-					checkIsCastableToDefinedType(o, parameterTool.data);
-					checkChoices(o, parameterTool.data);
+					checkAmbiguousValues(o, newParameters);
+					checkIsCastableToDefinedType(o, newParameters);
+					checkChoices(o, newParameters);
 				}
 			} else {
 				// check if there is a default name or a value passed for a possibly defined alternative name.
-				if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
+				if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, newParameters)) {
 					missingArguments.add(o.getName());
 				}
 			}
@@ -108,6 +112,8 @@ public class RequiredParameters {
 		if (!missingArguments.isEmpty()) {
 			throw new RequiredParametersException(this.missingArgumentsText(missingArguments), missingArguments);
 		}
+
+		return ParameterTool.fromMap(newParameters);
 	}
 
 	// check if the given parameter has a default value and add it to the passed map if that is the case

http://git-wip-us.apache.org/repos/asf/flink/blob/90b3b4cf/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 6ad2022..ccd472b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -23,16 +23,29 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Tests for {@link ParameterTool}.
@@ -574,6 +587,78 @@ public class ParameterToolTest extends AbstractParameterToolTest {
 		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
 	}
 
+	/**
+	 * Tests that we can concurrently serialize and access the ParameterTool. See FLINK-7943
+	 */
+	@Test
+	public void testConcurrentExecutionConfigSerialization() throws ExecutionException, InterruptedException {
+
+		final int numInputs = 10;
+		Collection<String> input = new ArrayList<>(numInputs);
+
+		for (int i = 0; i < numInputs; i++) {
+			input.add("--" + UUID.randomUUID());
+			input.add(UUID.randomUUID().toString());
+		}
+
+		final String[] args = input.toArray(new String[0]);
+
+		final ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+		final int numThreads = 5;
+		final int numSerializations = 100;
+
+		final Collection<CompletableFuture<Void>> futures = new ArrayList<>(numSerializations);
+
+		final ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+
+		try {
+			for (int i = 0; i < numSerializations; i++) {
+				futures.add(
+					CompletableFuture.runAsync(
+						() -> {
+							try {
+								serializeDeserialize(parameterTool);
+							} catch (Exception e) {
+								throw new CompletionException(e);
+							}
+						},
+						executorService));
+			}
+
+			for (CompletableFuture<Void> future : futures) {
+				future.get();
+			}
+		} finally {
+			executorService.shutdownNow();
+			executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	/**
+	 * Accesses parameter tool parameters and then serializes the given parameter tool and deserializes again.
+	 * @param parameterTool to serialize/deserialize
+	 */
+	private void serializeDeserialize(ParameterTool parameterTool) throws IOException, ClassNotFoundException {
+		// weirdly enough, this call has side effects making the ParameterTool serialization fail if not
+		// using a concurrent data structure.
+		parameterTool.get(UUID.randomUUID().toString());
+
+		try (
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+			oos.writeObject(parameterTool);
+			oos.close();
+			baos.close();
+
+			ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+			ObjectInputStream ois = new ObjectInputStream(bais);
+
+			// this should work :-)
+			ParameterTool deserializedParameterTool = ((ParameterTool) ois.readObject());
+		}
+	}
+
 	private static <T> Set<T> createHashSet(T... elements) {
 		Set<T> set = new HashSet<>();
 		for (T element : elements) {

http://git-wip-us.apache.org/repos/asf/flink/blob/90b3b4cf/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
index 11d1267..e8273ef 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/RequiredParametersTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.java.utils;
 
+import org.apache.flink.util.TestLogger;
+
 import org.hamcrest.CoreMatchers;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -33,7 +35,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for RequiredParameter class and its interactions with ParameterTool.
  */
-public class RequiredParametersTest {
+public class RequiredParametersTest extends TestLogger {
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
@@ -122,7 +124,7 @@ public class RequiredParametersTest {
 
 		try {
 			required.add(new Option("berlin").alt("b"));
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 			Assert.assertEquals(parameter.data.get("b"), "value");
 		} catch (RequiredParametersException e) {
@@ -137,7 +139,7 @@ public class RequiredParametersTest {
 
 		try {
 			required.add(new Option("berlin").alt("b").defaultValue("something"));
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 			Assert.assertEquals(parameter.data.get("b"), "value");
 		} catch (RequiredParametersException e) {
@@ -164,7 +166,7 @@ public class RequiredParametersTest {
 		RequiredParameters required = new RequiredParameters();
 		try {
 			required.add(new Option("berlin"));
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 		} catch (RequiredParametersException e) {
 			fail("Exception thrown " + e.getMessage());
@@ -177,7 +179,7 @@ public class RequiredParametersTest {
 		RequiredParameters required = new RequiredParameters();
 		try {
 			required.add(new Option("berlin").defaultValue("value"));
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 		} catch (RequiredParametersException e) {
 			fail("Exception thrown " + e.getMessage());
@@ -190,7 +192,7 @@ public class RequiredParametersTest {
 		RequiredParameters required = new RequiredParameters();
 		try {
 			required.add(new Option("berlin").alt("b").defaultValue("value"));
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 			Assert.assertEquals(parameter.data.get("b"), "value");
 		} catch (RequiredParametersException e) {
@@ -205,7 +207,7 @@ public class RequiredParametersTest {
 		try {
 			rq.add("input");
 			rq.add(new Option("parallelism").alt("p").defaultValue("1").type(OptionType.INTEGER));
-			rq.applyTo(parameter);
+			parameter = rq.applyTo(parameter);
 			Assert.assertEquals(parameter.data.get("parallelism"), "1");
 			Assert.assertEquals(parameter.data.get("p"), "1");
 			Assert.assertEquals(parameter.data.get("input"), "abc");
@@ -223,7 +225,7 @@ public class RequiredParametersTest {
 			required.add(new Option("count").defaultValue("15"));
 			required.add(new Option("someFlag").alt("sf").defaultValue("true"));
 
-			required.applyTo(parameter);
+			parameter = required.applyTo(parameter);
 
 			Assert.assertEquals(parameter.data.get("berlin"), "value");
 			Assert.assertEquals(parameter.data.get("count"), "15");