You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jiayi Liao (Jira)" <ji...@apache.org> on 2020/04/22 07:54:00 UTC

[jira] [Comment Edited] (FLINK-17320) Java8 lambda expression cannot be serialized.

    [ https://issues.apache.org/jira/browse/FLINK-17320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089415#comment-17089415 ] 

Jiayi Liao edited comment on FLINK-17320 at 4/22/20, 7:53 AM:
--------------------------------------------------------------

After some survey, the {{PriorityQueue}} can be serialized with some changes :


{code:java}
PriorityQueue<String> pq = new PriorityQueue<>((Comparator<String> & Serializable)(o1, o2) -> o1.length() - 8 - o2.length());
pq.add("1234135");
pq.add("12323424135");

KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, new ExecutionConfig());
kryoSerializer.getKryo().register(getClass());
kryoSerializer.getKryo().register(java.lang.invoke.SerializedLambda.class);
kryoSerializer.getKryo().register(ClosureSerializer.Closure.class, new ClosureSerializer());

kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
{code}

I'm not sure if this serialization problem can be handled from Flink's side (because the lambda expression cannot be serialized in true), but can we add some tips on the error message at least if it cannot be solved?

cc [~jark][~lzljs3620320]


was (Author: wind_ljy):
After some survey, the {{PriorityQueue}} can be serialized with some changes :


{code:java}
PriorityQueue<String> pq = new PriorityQueue<>((Comparator<String> & Serializable)(o1, o2) -> o1.length() - 8 - o2.length());
pq.add("1234135");
pq.add("12323424135");

KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, new ExecutionConfig());
kryoSerializer.getKryo().register(getClass());
kryoSerializer.getKryo().register(java.lang.invoke.SerializedLambda.class);
kryoSerializer.getKryo().register(ClosureSerializer.Closure.class, new ClosureSerializer());

kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
{code}

I'm not sure if this serialization problem can be handled from Flink's side (because the lambda expression cannot be serialized in true), but can we add some tips on the error message at least if it cannot be solved?


> Java8 lambda expression cannot be serialized.
> ---------------------------------------------
>
>                 Key: FLINK-17320
>                 URL: https://issues.apache.org/jira/browse/FLINK-17320
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System, Table SQL / Runtime
>    Affects Versions: 1.9.0
>            Reporter: Jiayi Liao
>            Priority: Major
>
> It happens when we want to use {{java.util.PriorityQueue}} in customed UDAF. The serialization error occurs with codes below.
> {code:java}
> @Test
> public void test() throws IOException {
>         PriorityQueue<String> pq = new PriorityQueue<>((o1, o2) -> o1.length - o2.length - 1);
>         pq.add("1234135");
>         pq.add("12323424135");
>         KryoSerializer kryoSerializer = new KryoSerializer(PriorityQueue.class, new ExecutionConfig());
>         kryoSerializer.serialize(pq, new DataOutputSerializer(10240));
> }
> {code}
> And the NPE will be thrown:
> {code:java}
> Caused by: java.lang.NullPointerException
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
>         at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:593)
>         at org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:67)
>         at org.apache.flink.runtime.types.PriorityQueueSerializer.write(PriorityQueueSerializer.java:40)
>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:307)
>         at org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:526)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)