You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yukun Guo <gy...@gmail.com> on 2016/09/18 10:53:53 UTC

Serialization problem for Guava's TreeMultimap

Here is the code snippet:

windowedStream.fold(TreeMultimap.<Long, String>create(), new
FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
    @Override
    public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
                                           Tuple2<String, Long>
itemCount) throws Exception {
        String item = itemCount.f0;
        Long count = itemCount.f1;
        topKSoFar.put(count, item);
        if (topKSoFar.keySet().size() > topK) {
            topKSoFar.removeAll(topKSoFar.keySet().first());
        }
        return topKSoFar;
    }
});


The problem is when fold function getting called, the initial value has
lost therefore it encounters a NullPointerException. This is due to failed
type extraction and serialization, as shown in the log message:
"INFO  TypeExtractor:1685 - No fields detected for class
com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. Will
be handled as GenericType."

I have tried the following two ways to fix it but neither of them worked:

1. Writing a class TreeMultimapSerializer which extends Kryo's
Serializer<T>, and calling
`env.addDefaultKryoSerializer(TreeMultimap.class, new
TreeMultimapSerializer()`. The write/read methods are almost line-by-line
translations from TreeMultimap's own implementation.

2. TreeMultimap has implemented Serializable interface so Kryo can fall
back to use the standard Java serialization. Since Kryo's JavaSerializer
itself is not serializable, I wrote an adapter to make it fit the
"addDefaultKryoSerializer" API.

Could you please give me some working examples for custom Kryo
serialization in Flink?


Best regards,
Yukun

Re: Serialization problem for Guava's TreeMultimap

Posted by Yukun Guo <gy...@gmail.com>.
Thank you for quickly fixing it!


On 20 September 2016 at 17:17, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Yukun,
>
> I debugged this issue and found that this is a bug in the serialization of
> the StateDescriptor.
> I have created FLINK-4640 [1] to resolve the issue.
>
> Thanks for reporting the issue.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-4640
>
> 2016-09-20 10:35 GMT+02:00 Yukun Guo <gy...@gmail.com>:
>
>> Some detail: if running the FoldFunction on a KeyedStream, everything
>> works fine. So it must relate to the way WindowedStream handles type
>> extraction.
>>
>> In case any Flink experts would like to reproduce it, I have created a
>> repo on Github: github.com/gyk/flink-multimap
>>
>> On 20 September 2016 at 10:33, Yukun Guo <gy...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> The same error occurs after changing the code, unfortunately.
>>>
>>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>>> serializer` where T extends Serializer<?> & Serializable, so I pass a
>>> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>>>
>>>
>>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Can you use "env.getConfig().registerTypeW
>>>> ithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gy...@gmail.com> wrote:
>>>>
>>>>> Here is the code snippet:
>>>>>
>>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>>>     @Override
>>>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
>>>>>                                            Tuple2<String, Long> itemCount) throws Exception {
>>>>>         String item = itemCount.f0;
>>>>>         Long count = itemCount.f1;
>>>>>         topKSoFar.put(count, item);
>>>>>         if (topKSoFar.keySet().size() > topK) {
>>>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>>>         }
>>>>>         return topKSoFar;
>>>>>     }
>>>>> });
>>>>>
>>>>>
>>>>> The problem is when fold function getting called, the initial value
>>>>> has lost therefore it encounters a NullPointerException. This is due to
>>>>> failed type extraction and serialization, as shown in the log message:
>>>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>>>> Will be handled as GenericType."
>>>>>
>>>>> I have tried the following two ways to fix it but neither of them
>>>>> worked:
>>>>>
>>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>>>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>>>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>>>> line-by-line translations from TreeMultimap's own implementation.
>>>>>
>>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can
>>>>> fall back to use the standard Java serialization. Since Kryo's
>>>>> JavaSerializer itself is not serializable, I wrote an adapter to make it
>>>>> fit the "addDefaultKryoSerializer" API.
>>>>>
>>>>> Could you please give me some working examples for custom Kryo
>>>>> serialization in Flink?
>>>>>
>>>>>
>>>>> Best regards,
>>>>> Yukun
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Serialization problem for Guava's TreeMultimap

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Yukun,

I debugged this issue and found that this is a bug in the serialization of
the StateDescriptor.
I have created FLINK-4640 [1] to resolve the issue.

Thanks for reporting the issue.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-4640

2016-09-20 10:35 GMT+02:00 Yukun Guo <gy...@gmail.com>:

> Some detail: if running the FoldFunction on a KeyedStream, everything
> works fine. So it must relate to the way WindowedStream handles type
> extraction.
>
> In case any Flink experts would like to reproduce it, I have created a
> repo on Github: github.com/gyk/flink-multimap
>
> On 20 September 2016 at 10:33, Yukun Guo <gy...@gmail.com> wrote:
>
>> Hi,
>>
>> The same error occurs after changing the code, unfortunately.
>>
>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>> serializer` where T extends Serializer<?> & Serializable, so I pass a
>> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>>
>>
>> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)"
>>> ?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gy...@gmail.com> wrote:
>>>
>>>> Here is the code snippet:
>>>>
>>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>>     @Override
>>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
>>>>                                            Tuple2<String, Long> itemCount) throws Exception {
>>>>         String item = itemCount.f0;
>>>>         Long count = itemCount.f1;
>>>>         topKSoFar.put(count, item);
>>>>         if (topKSoFar.keySet().size() > topK) {
>>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>>         }
>>>>         return topKSoFar;
>>>>     }
>>>> });
>>>>
>>>>
>>>> The problem is when fold function getting called, the initial value has
>>>> lost therefore it encounters a NullPointerException. This is due to failed
>>>> type extraction and serialization, as shown in the log message:
>>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>>> Will be handled as GenericType."
>>>>
>>>> I have tried the following two ways to fix it but neither of them
>>>> worked:
>>>>
>>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>>> line-by-line translations from TreeMultimap's own implementation.
>>>>
>>>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>>>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>>>> itself is not serializable, I wrote an adapter to make it fit the
>>>> "addDefaultKryoSerializer" API.
>>>>
>>>> Could you please give me some working examples for custom Kryo
>>>> serialization in Flink?
>>>>
>>>>
>>>> Best regards,
>>>> Yukun
>>>>
>>>>
>>>
>>
>

Re: Serialization problem for Guava's TreeMultimap

Posted by Yukun Guo <gy...@gmail.com>.
Some detail: if running the FoldFunction on a KeyedStream, everything works
fine. So it must relate to the way WindowedStream handles type extraction.

In case any Flink experts would like to reproduce it, I have created a repo
on Github: github.com/gyk/flink-multimap

On 20 September 2016 at 10:33, Yukun Guo <gy...@gmail.com> wrote:

> Hi,
>
> The same error occurs after changing the code, unfortunately.
>
> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
> serializer` where T extends Serializer<?> & Serializable, so I pass a
> custom GenericJavaSerializer<T>, but I guess this doesn't matter much.
>
>
> On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Can you use "env.getConfig().registerTypeWithKryoSerializer(
>> TreeMultimap.class, JavaSerializer.class)" ?
>>
>> Best,
>> Stephan
>>
>>
>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gy...@gmail.com> wrote:
>>
>>> Here is the code snippet:
>>>
>>> windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>>     @Override
>>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
>>>                                            Tuple2<String, Long> itemCount) throws Exception {
>>>         String item = itemCount.f0;
>>>         Long count = itemCount.f1;
>>>         topKSoFar.put(count, item);
>>>         if (topKSoFar.keySet().size() > topK) {
>>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>>         }
>>>         return topKSoFar;
>>>     }
>>> });
>>>
>>>
>>> The problem is when fold function getting called, the initial value has
>>> lost therefore it encounters a NullPointerException. This is due to failed
>>> type extraction and serialization, as shown in the log message:
>>> "INFO  TypeExtractor:1685 - No fields detected for class
>>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>>> Will be handled as GenericType."
>>>
>>> I have tried the following two ways to fix it but neither of them worked:
>>>
>>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>>> new TreeMultimapSerializer()`. The write/read methods are almost
>>> line-by-line translations from TreeMultimap's own implementation.
>>>
>>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>>> itself is not serializable, I wrote an adapter to make it fit the
>>> "addDefaultKryoSerializer" API.
>>>
>>> Could you please give me some working examples for custom Kryo
>>> serialization in Flink?
>>>
>>>
>>> Best regards,
>>> Yukun
>>>
>>>
>>
>

Re: Serialization problem for Guava's TreeMultimap

Posted by Yukun Guo <gy...@gmail.com>.
Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
serializer` where T extends Serializer<?> & Serializable, so I pass a
custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Can you use "env.getConfig().registerTypeWithKryoSerializer
> (TreeMultimap.class, JavaSerializer.class)" ?
>
> Best,
> Stephan
>
>
> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gy...@gmail.com> wrote:
>
>> Here is the code snippet:
>>
>> windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>     @Override
>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
>>                                            Tuple2<String, Long> itemCount) throws Exception {
>>         String item = itemCount.f0;
>>         Long count = itemCount.f1;
>>         topKSoFar.put(count, item);
>>         if (topKSoFar.keySet().size() > topK) {
>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>         }
>>         return topKSoFar;
>>     }
>> });
>>
>>
>> The problem is when fold function getting called, the initial value has
>> lost therefore it encounters a NullPointerException. This is due to failed
>> type extraction and serialization, as shown in the log message:
>> "INFO  TypeExtractor:1685 - No fields detected for class
>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>> Will be handled as GenericType."
>>
>> I have tried the following two ways to fix it but neither of them worked:
>>
>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>> new TreeMultimapSerializer()`. The write/read methods are almost
>> line-by-line translations from TreeMultimap's own implementation.
>>
>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>> itself is not serializable, I wrote an adapter to make it fit the
>> "addDefaultKryoSerializer" API.
>>
>> Could you please give me some working examples for custom Kryo
>> serialization in Flink?
>>
>>
>> Best regards,
>> Yukun
>>
>>
>

Re: Serialization problem for Guava's TreeMultimap

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Can you use
"env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class,
JavaSerializer.class)"
?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gy...@gmail.com> wrote:

> Here is the code snippet:
>
> windowedStream.fold(TreeMultimap.<Long, String>create(), new FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>     @Override
>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> topKSoFar,
>                                            Tuple2<String, Long> itemCount) throws Exception {
>         String item = itemCount.f0;
>         Long count = itemCount.f1;
>         topKSoFar.put(count, item);
>         if (topKSoFar.keySet().size() > topK) {
>             topKSoFar.removeAll(topKSoFar.keySet().first());
>         }
>         return topKSoFar;
>     }
> });
>
>
> The problem is when fold function getting called, the initial value has
> lost therefore it encounters a NullPointerException. This is due to failed
> type extraction and serialization, as shown in the log message:
> "INFO  TypeExtractor:1685 - No fields detected for class
> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
> Will be handled as GenericType."
>
> I have tried the following two ways to fix it but neither of them worked:
>
> 1. Writing a class TreeMultimapSerializer which extends Kryo's
> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
> new TreeMultimapSerializer()`. The write/read methods are almost
> line-by-line translations from TreeMultimap's own implementation.
>
> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
> back to use the standard Java serialization. Since Kryo's JavaSerializer
> itself is not serializable, I wrote an adapter to make it fit the
> "addDefaultKryoSerializer" API.
>
> Could you please give me some working examples for custom Kryo
> serialization in Flink?
>
>
> Best regards,
> Yukun
>
>