You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/29 14:47:21 UTC
flink git commit: [FLINK-8165] ParameterTool serialization fix
Repository: flink
Updated Branches:
refs/heads/master 520a74f4b -> 731896ae7
[FLINK-8165] ParameterTool serialization fix
Closes #5096
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/731896ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/731896ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/731896ae
Branch: refs/heads/master
Commit: 731896ae709753d5cc225d94585cb0d738e86ca9
Parents: 520a74f
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Nov 29 10:28:50 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Nov 29 15:45:04 2017 +0100
----------------------------------------------------------------------
.../apache/flink/api/java/utils/ParameterTool.java | 4 ++--
.../api/java/utils/AbstractParameterToolTest.java | 17 +++++++++++++++++
2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/731896ae/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index e42a4b7..7518fa0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -618,7 +618,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
- defaultData = Collections.emptyMap();
- unrequestedParameters = Collections.emptySet();
+ defaultData = new ConcurrentHashMap<>(data.size());
+ unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/731896ae/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
index f4afdd2..cd37121 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java
@@ -20,11 +20,14 @@ package org.apache.flink.api.java.utils;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import static org.junit.Assert.fail;
+
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
@@ -39,6 +42,20 @@ public abstract class AbstractParameterToolTest {
protected void validate(ParameterTool parameter) {
ClosureCleaner.ensureSerializable(parameter);
+ validatePrivate(parameter);
+
+ // -------- test behaviour after serialization ------------
+ ParameterTool copy = null;
+ try {
+ byte[] b = InstantiationUtil.serializeObject(parameter);
+ copy = InstantiationUtil.deserializeObject(b, getClass().getClassLoader());
+ } catch (Exception e) {
+ fail();
+ }
+ validatePrivate(copy);
+ }
+
+ private void validatePrivate(ParameterTool parameter) {
Assert.assertEquals("myInput", parameter.getRequired("input"));
Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue"));
Assert.assertEquals(null, parameter.get("whatever"));