You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by forideal <fs...@163.com> on 2020/08/14 10:38:37 UTC
Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered
unregistered class ID
Hi
I wrote a UDAF referring to this article https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions, when using in-memory state, the task can run normally. However, When I chose rocksdb as the state backend, I encountered this error. Thank you for helping me see this problem.
The following is the error content:
com.esotericsoftware.kryo.KryoException: Encountered unregistered classID: 87
Serialization trace:
list (com.red.data.platform.RedConcat$ConcatString)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.util.InstantiationUtil.deserializeFromByt
public class RedConcat extends AggregateFunction<String, RedConcat.ConcatString> {
public class ConcatString {
public List<String> list = new ArrayList<>();
public void add(String toString) {
if (list != null) {
if (list.size() < 100) {
list.add(toString);
}
}
}
}
@Override
public boolean isDeterministic() {
return false;
}
@Override
public ConcatString createAccumulator() {
return new ConcatString();
}
@Override
public void open(FunctionContext context)
throws Exception {
}
Best forideal
Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException:
Encountered unregistered class ID
Posted by Timo Walther <tw...@apache.org>.
Hi Forideal,
luckily these problems will belong to the past in Flink 1.12 when UDAF
are updated to the new type system [1]. Lists will be natively supported
and registering custom KryoSerializers consistently as well.
Until then, another workaround is to override getAccumulatorType() and
define the PojoTypeInfo of ConcatString manually and e.g. replace the
GenericTypeInfo<List> with a proper
org.apache.flink.api.java.typeutils.ListTypeInfo.
I hope this helps.
Regards,
Timo
[1] https://issues.apache.org/jira/browse/FLINK-15803
On 15.08.20 08:07, forideal wrote:
> Hi Robert Metzger,
>
> I am very happy to share my code,
>
>
> public class ConcatString {
> public List<String>list =new ArrayList<>();
>
> public void add(String toString) {
> if (list !=null) {
> if (list.size() <100) {
> list.add(toString);
> }
> }
> }
> }
>
> > Are you registering your custom types in the ExecutionConfig? (If
> so, it increases the chances of this error to happen)
> Let me describe my scenario. We have built a SQL platform based on
> Flink, hoping to support user-defined UDF/UDAF, hoping that users only
> submit SQL and do not need to customize other codes. As for the
> serialization problem, it does exist.
>
> I currently work around this problem like this
> First
> :this.env.getConfig().registerTypeWithKryoSerializer(ArrayList.class,
> org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer.class);
> Second:ConcatString inherits Arraylist
>
>
> public class ConcatString extends ArrayList<String> {
>
> @Override
> public boolean add(String toString) {
> if (this.size() <1000) {
> super.add(toString);
> return true;
> }
> return false;
> }
>
> public List<String>getList() {
> return this;
> }
>
> }
>
>
> Best forideal
>
>
>
> At 2020-08-14 21:46:45, "Robert Metzger" <rm...@apache.org> wrote:
>
> Hi Forideal,
>
> When using RocksDB, we need to serialize the data (to store it on
> disk), whereas when using the memory backend, the data (in this
> case RedConcat.ConcatString instances) is on the heap, thus we won't
> run into this issue.
>
> Are you registering your custom types in the ExecutionConfig? (If
> so, it increases the chances of this error to happen)
>
> Could you share the code of RedConcat.ConcatString as well?
>
> I would not be surprised if this is a bug in Flink. Using a UDAF
> with custom types is probably not a very common use case.
>
> Best,
> Robert
>
>
>
> On Fri, Aug 14, 2020 at 12:39 PM forideal <fszwfly@163.com
> <ma...@163.com>> wrote:
>
> Hi
> I wrote a UDAF referring to this article
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions,
> when using in-memory state, the task can run normally. However,
> When I chose rocksdb as the state backend, I encountered this
> error. Thank you for helping me see this problem.
>
> The following is the error content:
> com.esotericsoftware.kryo.KryoException: Encountered
> unregistered class ID: 87
> Serialization trace:
> list (com.red.data.platform.RedConcat$ConcatString)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
> at org.apache.flink.util.InstantiationUtil.deserializeFromByt
>
> public class RedConcatextends AggregateFunction<String, RedConcat.ConcatString> {
>
> public class ConcatString {
> public List<String>list =new ArrayList<>();
>
> public void add(String toString) {
> if (list !=null) {
> if (list.size() <100) {
> list.add(toString);
> }
> }
> }
> }
>
> @Override
> public boolean isDeterministic() {
> return false;
> }
>
> @Override
> public ConcatStringcreateAccumulator() {
> return new ConcatString();
> }
>
> @Override
> public void open(FunctionContext context)
> throws Exception {
> }
>
>
>
> Best forideal
>
>
>
>
Re:Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException:
Encountered unregistered class ID
Posted by forideal <fs...@163.com>.
Hi Robert Metzger,
I am very happy to share my code,
public class ConcatString {
public List<String> list = new ArrayList<>();
public void add(String toString) {
if (list != null) {
if (list.size() < 100) {
list.add(toString);
}
}
}
}
> Are you registering your custom types in the ExecutionConfig? (If so, it increases the chances of this error to happen)
Let me describe my scenario. We have built a SQL platform based on Flink, hoping to support user-defined UDF/UDAF, hoping that users only submit SQL and do not need to customize other codes. As for the serialization problem, it does exist.
I currently work around this problem like this
First :this.env.getConfig().registerTypeWithKryoSerializer(ArrayList.class, org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer.class);
Second:ConcatString inherits Arraylist
public class ConcatString extends ArrayList<String> {
@Override
public boolean add(String toString) {
if (this.size() < 1000) {
super.add(toString);
return true;
}
return false;
}
public List<String> getList() {
return this;
}
}
Best forideal
At 2020-08-14 21:46:45, "Robert Metzger" <rm...@apache.org> wrote:
Hi Forideal,
When using RocksDB, we need to serialize the data (to store it on disk), whereas when using the memory backend, the data (in this case RedConcat.ConcatString instances) is on the heap, thus we won't run into this issue.
Are you registering your custom types in the ExecutionConfig? (If so, it increases the chances of this error to happen)
Could you share the code of RedConcat.ConcatString as well?
I would not be surprised if this is a bug in Flink. Using a UDAF with custom types is probably not a very common use case.
Best,
Robert
On Fri, Aug 14, 2020 at 12:39 PM forideal <fs...@163.com> wrote:
Hi
I wrote a UDAF referring to this article https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions, when using in-memory state, the task can run normally. However, When I chose rocksdb as the state backend, I encountered this error. Thank you for helping me see this problem.
The following is the error content:
com.esotericsoftware.kryo.KryoException: Encountered unregistered classID: 87
Serialization trace:
list (com.red.data.platform.RedConcat$ConcatString)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at org.apache.flink.util.InstantiationUtil.deserializeFromByt
public class RedConcat extends AggregateFunction<String, RedConcat.ConcatString> {
public class ConcatString {
public List<String> list = new ArrayList<>();
public void add(String toString) {
if (list != null) {
if (list.size() < 100) {
list.add(toString);
}
}
}
}
@Override
public boolean isDeterministic() {
return false;
}
@Override
public ConcatString createAccumulator() {
return new ConcatString();
}
@Override
public void open(FunctionContext context)
throws Exception {
}
Best forideal
Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException:
Encountered unregistered class ID
Posted by Robert Metzger <rm...@apache.org>.
Hi Forideal,
When using RocksDB, we need to serialize the data (to store it on disk),
whereas when using the memory backend, the data (in this
case RedConcat.ConcatString instances) is on the heap, thus we won't run
into this issue.
Are you registering your custom types in the ExecutionConfig? (If so, it
increases the chances of this error to happen)
Could you share the code of RedConcat.ConcatString as well?
I would not be surprised if this is a bug in Flink. Using a UDAF with
custom types is probably not a very common use case.
Best,
Robert
On Fri, Aug 14, 2020 at 12:39 PM forideal <fs...@163.com> wrote:
> Hi
> I wrote a UDAF referring to this article
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions,
> when using in-memory state, the task can run normally. However, When I
> chose rocksdb as the state backend, I encountered this error. Thank you for
> helping me see this problem.
>
> The following is the error content:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 87
> Serialization trace:
> list (com.red.data.platform.RedConcat$ConcatString)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:99)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .deserialize(KryoSerializer.java:346)
> at org.apache.flink.util.InstantiationUtil.deserializeFromByt
>
> public class RedConcat extends AggregateFunction<String, RedConcat.ConcatString> {
>
> public class ConcatString {
> public List<String> list = new ArrayList<>();
>
> public void add(String toString) {
> if (list != null) {
> if (list.size() < 100) {
> list.add(toString);
> }
> }
> }
> }
>
> @Override
> public boolean isDeterministic() {
> return false;
> }
>
> @Override
> public ConcatString createAccumulator() {
> return new ConcatString();
> }
>
> @Override
> public void open(FunctionContext context)
> throws Exception {
> }
>
>
>
> Best forideal
>
>
>
>
>