You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:45 UTC

[4/8] flink git commit: [hotfix] [core] Fix/cleanup serialization test for ExecutionConfig

[hotfix] [core] Fix/cleanup serialization test for ExecutionConfig


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afd36f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afd36f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afd36f98

Branch: refs/heads/master
Commit: afd36f9814ee282df8e3a58e846911f6efa54c61
Parents: d498cbe
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 15:25:20 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfigTest.java   | 76 ++++++++++++++++++-
 .../graph/StreamingJobGraphGeneratorTest.java   | 79 +-------------------
 2 files changed, 74 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index d000ff9..7e98604 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -22,12 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
+
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -39,17 +45,17 @@ public class ExecutionConfigTest {
 		List<Class<?>> types = Arrays.<Class<?>>asList(Double.class, Integer.class, Double.class);
 		List<Class<?>> expectedTypes = Arrays.<Class<?>>asList(Double.class, Integer.class);
 
-		for(Class<?> tpe: types) {
+		for (Class<?> tpe: types) {
 			config.registerKryoType(tpe);
 		}
 
 		int counter = 0;
 
-		for(Class<?> tpe: config.getRegisteredKryoTypes()){
+		for (Class<?> tpe: config.getRegisteredKryoTypes()){
 			assertEquals(tpe, expectedTypes.get(counter++));
 		}
 
-		assertTrue(counter == expectedTypes.size());
+		assertEquals(expectedTypes.size(), counter);
 	}
 
 	@Test
@@ -88,4 +94,68 @@ public class ExecutionConfigTest {
 			// expected
 		}
 	}
+
+	@Test
+	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
+		final Random r = new Random();
+
+		final int parallelism = 1 + r.nextInt(10);
+		final boolean closureCleanerEnabled = r.nextBoolean(), 
+				forceAvroEnabled = r.nextBoolean(),
+				forceKryoEnabled = r.nextBoolean(),
+				disableGenericTypes = r.nextBoolean(),
+				objectReuseEnabled = r.nextBoolean(),
+				sysoutLoggingEnabled = r.nextBoolean();
+
+		final ExecutionConfig config = new ExecutionConfig();
+
+		if (closureCleanerEnabled) {
+			config.enableClosureCleaner();
+		} else {
+			config.disableClosureCleaner();
+		}
+		if (forceAvroEnabled) {
+			config.enableForceAvro();
+		} else {
+			config.disableForceAvro();
+		}
+		if (forceKryoEnabled) {
+			config.enableForceKryo();
+		} else {
+			config.disableForceKryo();
+		}
+		if (disableGenericTypes) {
+			config.disableGenericTypes();
+		} else {
+			config.enableGenericTypes();
+		}
+		if (objectReuseEnabled) {
+			config.enableObjectReuse();
+		} else {
+			config.disableObjectReuse();
+		}
+		if (sysoutLoggingEnabled) {
+			config.enableSysoutLogging();
+		} else {
+			config.disableSysoutLogging();
+		}
+		config.setParallelism(parallelism);
+
+		final ExecutionConfig copy1 = CommonTestUtils.createCopySerializable(config);
+		final ExecutionConfig copy2 = new SerializedValue<>(config).deserializeValue(getClass().getClassLoader());
+
+		assertNotNull(copy1);
+		assertNotNull(copy2);
+
+		assertEquals(config, copy1);
+		assertEquals(config, copy2);
+
+		assertEquals(closureCleanerEnabled, copy1.isClosureCleanerEnabled());
+		assertEquals(forceAvroEnabled, copy1.isForceAvroEnabled());
+		assertEquals(forceKryoEnabled, copy1.isForceKryoEnabled());
+		assertEquals(disableGenericTypes, copy1.hasGenericTypesDisabled());
+		assertEquals(objectReuseEnabled, copy1.isObjectReuseEnabled());
+		assertEquals(sysoutLoggingEnabled, copy1.isSysoutLoggingEnabled());
+		assertEquals(parallelism, copy1.getParallelism());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/afd36f98/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 968b1c9..5f1973c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -27,96 +26,20 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
-	
-	@Test
-	public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
-		final long seed = System.currentTimeMillis();
-		final Random r = new Random(seed);
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		StreamGraph streamingJob = new StreamGraph(env);
-		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
-		
-		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
-		int dop = 1 + r.nextInt(10);
-		
-		ExecutionConfig config = streamingJob.getExecutionConfig();
-		if(closureCleanerEnabled) {
-			config.enableClosureCleaner();
-		} else {
-			config.disableClosureCleaner();
-		}
-		if(forceAvroEnabled) {
-			config.enableForceAvro();
-		} else {
-			config.disableForceAvro();
-		}
-		if(forceKryoEnabled) {
-			config.enableForceKryo();
-		} else {
-			config.disableForceKryo();
-		}
-		if(disableGenericTypes) {
-			config.disableGenericTypes();
-		} else {
-			config.enableGenericTypes();
-		}
-		if(objectReuseEnabled) {
-			config.enableObjectReuse();
-		} else {
-			config.disableObjectReuse();
-		}
-		if(sysoutLoggingEnabled) {
-			config.enableSysoutLogging();
-		} else {
-			config.disableSysoutLogging();
-		}
-		config.setParallelism(dop);
-		
-		JobGraph jobGraph = compiler.createJobGraph();
-
-		final String EXEC_CONFIG_KEY = "runtime.config";
-
-		InstantiationUtil.writeObjectToConfig(jobGraph.getSerializedExecutionConfig(),
-			jobGraph.getJobConfiguration(),
-			EXEC_CONFIG_KEY);
-
-		SerializedValue<ExecutionConfig> serializedExecutionConfig = InstantiationUtil.readObjectFromConfig(
-				jobGraph.getJobConfiguration(),
-				EXEC_CONFIG_KEY,
-				Thread.currentThread().getContextClassLoader());
 
-		assertNotNull(serializedExecutionConfig);
-
-		ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(getClass().getClassLoader());
-
-		assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
-		assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
-		assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
-		assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
-		assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
-		assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
-		assertEquals(dop, executionConfig.getParallelism());
-	}
-	
 	@Test
 	public void testParallelismOneNotChained() {