You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/16 14:05:44 UTC
[3/8] flink git commit: [FLINK-5692] [core] Add an Option to
Deactivate Kryo Fallback for Serializers
[FLINK-5692] [core] Add an Option to Deactivate Kryo Fallback for Serializers
This closes #3373
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f99aae1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f99aae1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f99aae1
Branch: refs/heads/master
Commit: 0f99aae1e1f8b693c2ba79a061046bc042113f0b
Parents: 677b508
Author: Jin Mingjian <ji...@gmail.com>
Authored: Tue Feb 21 11:57:21 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Mar 16 14:43:26 2017 +0100
----------------------------------------------------------------------
docs/dev/types_serialization.md | 7 +++++
.../flink/api/common/ExecutionConfig.java | 29 ++++++++++++++++++++
.../api/java/typeutils/GenericTypeInfo.java | 6 ++++
.../flink/api/common/ExecutionConfigTest.java | 25 +++++++++++++++++
.../graph/StreamingJobGraphGeneratorTest.java | 8 +++++-
5 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/docs/dev/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/types_serialization.md b/docs/dev/types_serialization.md
index e723c33..20ee071 100644
--- a/docs/dev/types_serialization.md
+++ b/docs/dev/types_serialization.md
@@ -306,6 +306,13 @@ env.getConfig().addDefaultKryoSerializer(Class<?> type, Class<? extends Serializ
There are different variants of these methods available.
+If you do not want to fall back to Kryo and further make sure that you have provided your own custom serializers for all POJOs explicitly, set
+{% highlight java %}
+env.getConfig().disableGenericTypes();
+{% endhighlight %}
+
+If generic types disabled, an {@link UnsupportedOperationException} will be thrown when Flink tries to fall back to the default Kryo serializer logic in the runtime.
+
## Defining Type Information using a Factory
A type information factory allows for plugging-in user-defined type information into the Flink type system.
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 32ea0a3..3bd91c7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -109,6 +109,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean forceKryo = false;
+ private boolean disableGenericTypes = false;
+
private boolean objectReuse = false;
private boolean autoTypeRegistrationEnabled = true;
@@ -519,6 +521,31 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
/**
+ * Enable generic types.
+ *
+ * @see ExecutionConfig#disableGenericTypes()
+ */
+ public void enableGenericTypes() {
+ disableGenericTypes = false;
+ }
+
+ /**
+ * Disable generic types to make sure that you have provided your own custom serializers for
+ * all POJOs explicitly.
+ *
+ * If generic types disabled,
+ * an {@link UnsupportedOperationException} will be thrown when Flink
+ * tries to fall back to the default Kryo serializer logic in the runtime.
+ */
+ public void disableGenericTypes() {
+ disableGenericTypes = true;
+ }
+
+ public boolean hasGenericTypesDisabled() {
+ return disableGenericTypes;
+ }
+
+ /**
* Force Flink to use the AvroSerializer for POJOs.
*/
public void enableForceAvro() {
@@ -804,6 +831,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
((restartStrategyConfiguration == null && other.restartStrategyConfiguration == null) ||
(null != restartStrategyConfiguration && restartStrategyConfiguration.equals(other.restartStrategyConfiguration))) &&
forceKryo == other.forceKryo &&
+ disableGenericTypes == other.disableGenericTypes &&
objectReuse == other.objectReuse &&
autoTypeRegistrationEnabled == other.autoTypeRegistrationEnabled &&
forceAvro == other.forceAvro &&
@@ -830,6 +858,7 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
parallelism,
restartStrategyConfiguration,
forceKryo,
+ disableGenericTypes,
objectReuse,
autoTypeRegistrationEnabled,
forceAvro,
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index bc4e87a..a4cea31 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -81,6 +81,12 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
+ if (config.hasGenericTypesDisabled()) {
+ throw new UnsupportedOperationException(
+ "Generic types are disabled for POJOs serialization, but type " + this.typeClass +
+ " is treated as a generic type.");
+ }
+
return new KryoSerializer<T>(this.typeClass, config);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 883ee6c..4956a9a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -18,6 +18,10 @@
package org.apache.flink.api.common;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.junit.Test;
import java.util.Arrays;
@@ -25,6 +29,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class ExecutionConfigTest {
@@ -64,4 +69,24 @@ public class ExecutionConfigTest {
assertEquals(parallelism, config.getParallelism());
}
+ @Test
+ public void testForceCustomSerializerCheck() {
+ ExecutionConfig conf = new ExecutionConfig();
+ TypeInformation<Object> typeInfo = new GenericTypeInfo<Object>(Object.class);
+ TypeSerializer<Object> serializer = typeInfo.createSerializer(conf);
+ assertTrue(serializer instanceof KryoSerializer);
+
+ conf.disableGenericTypes();
+ boolean createSerializerFailed = false;
+ try {
+ typeInfo.createSerializer(conf);
+ } catch (UnsupportedOperationException e) {
+ createSerializerFailed = true;
+ } catch (Throwable t) {
+ fail("Unexpected exception thrown: " + t.getMessage());
+ }
+
+ assertTrue(createSerializerFailed);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0f99aae1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7c51bc2..968b1c9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -55,7 +55,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamingJob = new StreamGraph(env);
StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob);
- boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
+ boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), disableGenericTypes = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
int dop = 1 + r.nextInt(10);
ExecutionConfig config = streamingJob.getExecutionConfig();
@@ -74,6 +74,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
} else {
config.disableForceKryo();
}
+ if(disableGenericTypes) {
+ config.disableGenericTypes();
+ } else {
+ config.enableGenericTypes();
+ }
if(objectReuseEnabled) {
config.enableObjectReuse();
} else {
@@ -106,6 +111,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
+ assertEquals(disableGenericTypes, executionConfig.hasGenericTypesDisabled());
assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
assertEquals(dop, executionConfig.getParallelism());