You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/25 20:59:00 UTC

[jira] [Commented] (FLINK-6857) Add global default Kryo serializer configuration to StreamExecutionEnvironment

    [ https://issues.apache.org/jira/browse/FLINK-6857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16664270#comment-16664270 ] 

ASF GitHub Bot commented on FLINK-6857:
---------------------------------------

zentol closed pull request #4166: [FLINK-6857] [types] Add global default Kryo serializer configuration…
URL: https://github.com/apache/flink/pull/4166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1e945..1b02176fe5d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.api.common;
 
+import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -146,6 +148,8 @@
 	 */
 	private long taskCancellationTimeoutMillis = -1;
 
+	private Kryo kryo = new Kryo();
+
 	// ------------------------------- User code values --------------------------------------------
 
 	private GlobalJobParameters globalJobParameters;
@@ -678,7 +682,8 @@ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) {
 	 * Adds a new Kryo default serializer to the Runtime.
 	 *
 	 * Note that the serializer instance must be serializable (as defined by java.io.Serializable),
-	 * because it may be distributed to the worker nodes by java serialization.
+	 * because it may be distributed to the worker nodes by java serialization. Also, this method
+	 * can only tied to specific class which correspond to the addDefaultSerializer method in Kryo.
 	 *
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializer The serializer to use.
@@ -694,6 +699,9 @@ public void setGlobalJobParameters(GlobalJobParameters globalJobParameters) {
 	/**
 	 * Adds a new Kryo default serializer to the Runtime.
 	 *
+	 * Note that this method can only tied to specific class which correspond
+	 * to the addDefaultSerializer method in Kryo.
+	 *
 	 * @param type The class of the types serialized with the given serializer.
 	 * @param serializerClass The class of the serializer to use.
 	 */
@@ -704,6 +712,25 @@ public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?
 		defaultKryoSerializerClasses.put(type, serializerClass);
 	}
 
+	/**
+	 * Sets a new Kryo default serializer to the Runtime.
+	 *
+	 * Note that this method is different from {@link #addDefaultKryoSerializer(Class, Class)},
+	 * you can specify your own serializer class to use when no {@link #addDefaultKryoSerializer(Class, Class)
+	 * default serializers} match an object's type.
+	 *
+	 * @param type The class of the types serialized with the given serializer.
+	 * @param serializerClass The class of the serializer to use.
+	 */
+	public void setDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
+		if (type == null || serializerClass == null) {
+			throw new NullPointerException("Cannot register null class or serializer.");
+		}
+		kryo.newInstance(type);
+		kryo.setDefaultSerializer(serializerClass);
+		defaultKryoSerializerClasses.put(type, serializerClass);
+	}
+
 	/**
 	 * Registers the given type with a Kryo Serializer.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 46c821edfa2..b4f328a8907 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -559,6 +559,22 @@ public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?
 		config.addDefaultKryoSerializer(type, serializerClass);
 	}
 
+	/**
+	 * Sets a new Kryo default serializer to the Runtime.
+	 *
+	 * <p>Note that this method is different from {@link #addDefaultKryoSerializer(Class, Class)},
+	 * you can specify your own serializer class to use when no {@link #addDefaultKryoSerializer(Class, Class)
+	 * default serializers} match an object's type.
+	 *
+	 * @param type
+	 * 		The class of the types serialized with the given serializer.
+	 * @param serializerClass
+	 * 		The class of the serializer to use.
+	 */
+	public void setDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
+		config.setDefaultKryoSerializer(type, serializerClass);
+	}
+
 	/**
 	 * Registers the given type with a Kryo Serializer.
 	 *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add global default Kryo serializer configuration to StreamExecutionEnvironment
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-6857
>                 URL: https://issues.apache.org/jira/browse/FLINK-6857
>             Project: Flink
>          Issue Type: Improvement
>          Components: Configuration, Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: zhangminglei
>            Priority: Major
>              Labels: pull-request-available
>
> See ML for original discussion: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/KryoException-Encountered-unregistered-class-ID-td13476.html.
> We should have an additional {{setDefaultKryoSerializer}} method that allows overriding the global default serializer that is not tied to specific classes (out-of-the-box Kryo uses the {{FieldSerializer}} if no matches for default serializer settings can be found for a class). Internally in Flink's {{KryoSerializer}}, this would only be a matter of proxying that configured global default serializer for Kryo by calling {{Kryo.setDefaultSerializer(...)}} on the created Kryo instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)