You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:45 UTC
[4/8] flink git commit: [hotfix] [core] Fix/cleanup serialization
test for ExecutionConfig
[hotfix] [core] Fix/cleanup serialization test for ExecutionConfig
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afd36f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afd36f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afd36f98
Branch: refs/heads/master
Commit: afd36f9814ee282df8e3a58e846911f6efa54c61
Parents: d498cbe
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:25:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfigTest.java | 76 ++++++++++++++++++-
.../graph/StreamingJobGraphGeneratorTest.java | 79 +-------------------
2 files changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d000ff9..7e98604 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -39,17 +45,17 @@ public class ExecutionConfigTest {
List<Class<?>> types = Arrays.<Class<?>>asList(Double.class, Integer.class, Double.class);
List<Class<?>> expectedTypes = Arrays.<Class<?>>asList(Double.class, Integer.class);
- for(Class<?> tpe: types) {
+ for (Class<?> tpe: types) {
config.registerKryoType(tpe);
}
int counter = 0;
- for(Class<?> tpe: config.getRegisteredKryoTypes()){
+ for (Class<?> tpe: config.getRegisteredKryoTypes()){
assertEquals(tpe, expectedTypes.get(counter++));
}
- assertTrue(counter == expectedTypes.size());
+ assertEquals(expectedTypes.size(), counter);
}
@Test
@@ -88,4 +94,68 @@ public class ExecutionConfigTest {
// expected
}
}
+
+ @Test
+ public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
+ final Random r = new Random();
+
+ final int parallelism = 1 + r.nextInt(10);
+ final boolean closureCleanerEnabled = r.nextBoolean(),
+ forceAvroEnabled = r.nextBoolean(),
+ forceKryoEnabled = r.nextBoolean(),
+ disableGenericTypes = r.nextBoolean(),
+ objectReuseEnabled = r.nextBoolean(),
+ sysoutLoggingEnabled = r.nextBoolean();
+
+ final ExecutionConfig config = new ExecutionConfig();
+
+ if (closureCleanerEnabled) {
+ config.enableClosureCleaner();
+ } else {
+ config.disableClosureCleaner();
+ }
+ if (forceAvroEnabled) {
+ config.enableForceAvro();
+ } else {
+ config.disableForceAvro();
+ }
+ if (forceKryoEnabled) {
+ config.enableForceKryo();
+ } else {
+ config.disableForceKryo();
+ }
+ if (disableGenericTypes) {
+ config.disableGenericTypes();
+ } else {
+ config.enableGenericTypes();
+ }
+ if (objectReuseEnabled) {
+ config.enableObjectReuse();
+ } else {
+ config.disableObjectReuse();
+ }
+ if (sysoutLoggingEnabled) {
+ config.enableSysoutLogging();
+ } else {
+ config.disableSysoutLogging();
+ }
+ config.setParallelism(parallelism);
+
+ final ExecutionConfig copy1 = CommonTestUtils.createCopySerializable(config);
+ final ExecutionConfig copy2 = new SerializedValue<>(config).deserializeValue(getClass().getClassLoader());
+
+ assertNotNull(copy1);
+ assertNotNull(copy2);
+
+ assertEquals(config, copy1);
+ assertEquals(config, copy2);
+
+ assertEquals(closureCleanerEnabled, copy1.isClosureCleanerEnabled());
+ assertEquals(forceAvroEnabled, copy1.isForceAvroEnabled());
+ assertEquals(forceKryoEnabled, copy1.isForceKryoEnabled());
+ assertEquals(disableGenericTypes, copy1.hasGenericTypesDisabled());
+ assertEquals(objectReuseEnabled, copy1.isObjectReuseEnabled());
+ assertEquals(sysoutLoggingEnabled, copy1.isSysoutLoggingEnabled());
+ assertEquals(parallelism, copy1.getParallelism());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 968b1c9..5f1973c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.graph;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -27,96 +26,20 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class StreamingJobGraphGeneratorTest extends TestLogger {
-
- @Test
- public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
- final long seed = System.currentTimeMillis();
- final Random r = new Random(seed);
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- StreamGraph streamingJob = new StreamGraph(env);
- StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-
- boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
- int dop = 1 + r.nextInt(10);
-
- ExecutionConfig config = streamingJob.getExecutionConfig();
- if(closureCleanerEnabled) {
- config.enableClosureCleaner();
- } else {
- config.disableClosureCleaner();
- }
- if(forceAvroEnabled) {
- config.enableForceAvro();
- } else {
- config.disableForceAvro();
- }
- if(forceKryoEnabled) {
- config.enableForceKryo();
- } else {
- config.disableForceKryo();
- }
- if(disableGenericTypes) {
- config.disableGenericTypes();
- } else {
- config.enableGenericTypes();
- }
- if(objectReuseEnabled) {
- config.enableObjectReuse();
- } else {
- config.disableObjectReuse();
- }
- if(sysoutLoggingEnabled) {
- config.enableSysoutLogging();
- } else {
- config.disableSysoutLogging();
- }
- config.setParallelism(dop);
-
- JobGraph jobGraph = compiler.createJobGraph();
-
- final String EXEC_CONFIG_KEY = "runtime.config";
-
- InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
- jobGraph.getJobConfiguration(),
- EXEC_CONFIG_KEY);
-
- SerializedValue<ExecutionConfig> serializedExecutionConfig = InstantiationUtil.readObjectFromConfig(
- jobGraph.getJobConfiguration(),
- EXEC_CONFIG_KEY,
- Thread.currentThread().getContextClassLoader());
- assertNotNull(serializedExecutionConfig);
-
- ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
-
- assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
- assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
- assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
- assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
- assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
- assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
- assertEquals(dop, executionConfig.getParallelism());
- }
-
@Test
public void testParallelismOneNotChained() {