You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:44 UTC

[3/8] flink git commit: [FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers

[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers

This closes #3373


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1

Branch: refs/heads/master
Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b
Parents: 677b508
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 11:57:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100

----------------------------------------------------------------------
 docs/dev/types_serialization.md                 |  7 +++++
 .../flink/api/common/ExecutionConfig.java       | 29 ++++++++++++++++++++
 .../api/java/typeutils/GenericTypeInfo.java     |  6 ++++
 .../flink/api/common/ExecutionConfigTest.java   | 25 +++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  8 +++++-
 5 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index e723c33..20ee071 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
 
 There are different variants of these methods available.
 
+If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+{% highlight java %}
+env.getConfig().disableGenericTypes();
+{% endhighlight %}
+
+If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
+
 ## Defining Type Information using a Factory
 
 A type information factory allows for plugging-in user-defined type information into the Flink type system.

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..3bd91c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 
 	private boolean forceKryo = false;
 
+	private boolean disableGenericTypes = false;
+
 	private boolean objectReuse = false;
 
 	private boolean autoTypeRegistrationEnabled = true;
@@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
+	 * Enable generic types.
+	 *
+	 * @see ExecutionConfig#disableGenericTypes()
+	 */
+	public void enableGenericTypes() {
+		disableGenericTypes = false;
+	}
+
+	/**
+	 * Disable generic types to make sure that you have provided your own custom serializers for
+	 * all POJOs explicitly.
+	 *
+	 * If generic types disabled,
+	 * an {@link UnsupportedOperationException} will be thrown when Flink
+	 * tries to fall back to the default Kryo serializer logic in the runtime.
+	 */
+	public void disableGenericTypes() {
+		disableGenericTypes = true;
+	}
+
+	public boolean hasGenericTypesDisabled() {
+		return disableGenericTypes;
+	}
+
+	/**
 	 * Force Flink to use the AvroSerializer for POJOs.
 	 */
 	public void enableForceAvro() {
@@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 				((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) ||
 					(null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
 				forceKryo == other.forceKryo &&
+				disableGenericTypes == other.disableGenericTypes &&
 				objectReuse == other.objectReuse &&
 				autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
 				forceAvro == other.forceAvro &&
@@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 			parallelism,
 			restartStrategyConfiguration,
 			forceKryo,
+			disableGenericTypes,
 			objectReuse,
 			autoTypeRegistrationEnabled,
 			forceAvro,

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index bc4e87a..a4cea31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
 	@Override
 	@PublicEvolving
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+		if (config.hasGenericTypesDisabled()) {
+			throw new UnsupportedOperationException(
+				"Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+				" is treated as a generic type.");
+		}
+
 		return new KryoSerializer<T>(this.typeClass, config);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 883ee6c..4956a9a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.api.common;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -25,6 +29,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class ExecutionConfigTest {
 
@@ -64,4 +69,24 @@ public class ExecutionConfigTest {
 		assertEquals(parallelism, config.getParallelism());
 	}
 
+	@Test
+	public void testForceCustomSerializerCheck() {
+		ExecutionConfig conf = new ExecutionConfig();
+		TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+		TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
+		assertTrue(serializer instanceof KryoSerializer);
+
+		conf.disableGenericTypes();
+		boolean createSerializerFailed = false;
+		try {
+			typeInfo.createSerializer(conf);
+		} catch (UnsupportedOperationException e) {
+			createSerializerFailed = true;
+		} catch (Throwable t) {
+			fail("Unexpected exception thrown: " + t.getMessage());
+		}
+
+		assertTrue(createSerializerFailed);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7c51bc2..968b1c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		StreamGraph streamingJob = new StreamGraph(env);
 		StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
 		
-		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+		boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
 		int dop = 1 + r.nextInt(10);
 		
 		ExecutionConfig config = streamingJob.getExecutionConfig();
@@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		} else {
 			config.disableForceKryo();
 		}
+		if(disableGenericTypes) {
+			config.disableGenericTypes();
+		} else {
+			config.enableGenericTypes();
+		}
 		if(objectReuseEnabled) {
 			config.enableObjectReuse();
 		} else {
@@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
 		assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
 		assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
+		assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
 		assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
 		assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
 		assertEquals(dop, executionConfig.getParallelism());