You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oytun Tez <oy...@motaword.com> on 2019/05/01 01:16:43 UTC

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

Making the tag a static element worked out, thank you!

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oytun@motaword.com — www.motaword.com


On Tue, Apr 23, 2019 at 10:37 AM Oytun Tez <oy...@motaword.com> wrote:

> Thank you Guowei and Dawid! I am trying your suggestions today and will
> report back.
>
> - I assume the cleaning operation should be done only once because of the
> upgrade, or should I run every time the application is up?
> - `static` sounds a very simple fix to get rid of this. Any drawbacks here?
>
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oytun@motaword.com — www.motaword.com
>
>
> On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Oytun,
>>
>> I think there is a regression introduced in 1.8 how we handle output
>> tags. The problem is we do not call ClosureCleaner on OutputTag.
>>
>> There are two options how you can workaround this issue:
>>
>> 1. Declare the OutputTag static
>>
>> 2. Clean the closure explicitly as Guowei suggested:
>> StreamExecutionEnvironment.clean(pendingProjectsTag)
>>
>> I also opened a jira issue to fix this (FLINK-12297[1])
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12297
>> On 22/04/2019 03:06, Guowei Ma wrote:
>>
>> I think you could try
>> StreamExecutionEnvironment.clean(pendingProjectsTag).
>>
>>
>> Oytun Tez <oy...@motaword.com>于2019年4月19日 周五下午9:58写道:
>>
>>> Forgot to answer one of your points: the parent class compiles well
>>> without this CEP selector (with timeout signature)...
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oytun@motaword.com — www.motaword.com
>>>
>>>
>>> On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <oy...@motaword.com> wrote:
>>>
>>>> Hey JingsongLee!
>>>>
>>>> Here are some findings...
>>>>
>>>>    - flatSelect *without timeout* works normally:
>>>>    patternStream.flatSelect(PatternFlatSelectFunction), this compiles
>>>>    well.
>>>>    - Converted the both timeout and select selectors to an *inner
>>>>    class* (not static), yielded the same results, doesn't compile.
>>>>    - flatSelect *without* timeout, but with an inner class for
>>>>    PatternFlatSelectFunction, it compiles (same as first bullet).
>>>>    - Tried both of these selectors with *empty* body. Just a skeleton
>>>>    class. Doesn't compile either. Empty body example is in my first email.
>>>>    - Tried making both selectors *static public inner* classes,
>>>>    doesn't compile either.
>>>>    - Extracted both timeout and flat selectors to their own *independent
>>>>    classes* in separate files. Doesn't compile.
>>>>    - I am putting the *error stack* below.
>>>>    - Without the timeout selector in any class or lambda shape, with
>>>>    empty or full body, flatSelect compiles well.
>>>>
>>>> Would these findings help? Any ideas?
>>>>
>>>> Here is an error stack:
>>>>
>>>> 09:36:51,925 ERROR
>>>> com.motaword.ipm.kernel.error.controller.ExceptionHandler     -
>>>> org.apache.flink.api.common.InvalidProgramException: The implementation
>>>> of the PatternFlatSelectAdapter is not serializable. The object probably
>>>> contains or references non serializable fields.
>>>> at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
>>>> at
>>>> org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
>>>> at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
>>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
>>>> at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
>>>> at
>>>> com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
>>>> at
>>>> com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
>>>> at
>>>> com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
>>>> at com.motaword.ipm.kernel.Application.main(Application.java:63)
>>>> Caused by: java.io.NotSerializableException:
>>>> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>> at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>> at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>> at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>> at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>> at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
>>>> at
>>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
>>>> ... 9 more
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ---
>>>> Oytun Tez
>>>>
>>>> *M O T A W O R D*
>>>> The World's Fastest Human Translation Platform.
>>>> oytun@motaword.com — www.motaword.com
>>>>
>>>>
>>>> On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <lz...@aliyun.com>
>>>> wrote:
>>>>
>>>>> Hi @Oytun Tez
>>>>> It Looks like your *PatternFlatSelectFunction* is not serializable.
>>>>> Because you use anonymous inner class,
>>>>> Check the class to which getPending belongs, maybe that class is not
>>>>> serializable?
>>>>>
>>>>> Or you may be advised not to use internal classes, but to use a static internal class.
>>>>>
>>>>> Best, JingsongLee
>>>>>
>>>>> ------------------------------------------------------------------
>>>>> From:Oytun Tez <oy...@motaword.com>
>>>>> Send Time:2019年4月19日(星期五) 03:38
>>>>> To:user <us...@flink.apache.org>
>>>>> Subject:PatternFlatSelectAdapter - Serialization issue after 1.8
>>>>> upgrade
>>>>>
>>>>> Hi all,
>>>>>
>>>>> We are just migration from 1.6 to 1.8. I encountered a serialization
>>>>> error which we didn't have before if memory serves: The
>>>>> implementation of the *PatternFlatSelectAdapter* is not serializable.
>>>>> The object probably contains or references non serializable fields.
>>>>>
>>>>> The method below simply intakes a PatternStream from CEP.pattern() and
>>>>> makes use of the sideoutput for timed-out events. Can you see anything
>>>>> weird here (WorkerEvent is the input, but collectors collect Project
>>>>> object)?
>>>>>
>>>>> protected DataStream<Project> getPending(PatternStream<WorkerEvent>
>>>>> patternStream) {
>>>>>             OutputTag<Project> pendingProjectsTag = new *OutputTag*
>>>>> <Project>("invitation-pending-projects"){};
>>>>>
>>>>>             return patternStream.*flatSelect*(
>>>>>                     pendingProjectsTag,
>>>>>                     new *PatternFlatTimeoutFunction*<WorkerEvent,
>>>>> Project>() {
>>>>>                         @Override
>>>>>                         public void *timeout*(Map<String,
>>>>> List<WorkerEvent>> map, long l, Collector<Project> collector) {
>>>>>                         }
>>>>>                     },
>>>>>                     new *PatternFlatSelectFunction*<WorkerEvent,
>>>>> Project>() {
>>>>>                         @Override
>>>>>                         public void *flatSelect*(Map<String,
>>>>> List<WorkerEvent>> pattern, Collector<Project> collector) {
>>>>>                         }
>>>>>                     }
>>>>>             ).name("Select pending projects for invitation").
>>>>> *getSideOutput*(pendingProjectsTag);
>>>>>         }
>>>>>
>>>>> ---
>>>>> Oytun Tez
>>>>>
>>>>> *M O T A W O R D*
>>>>> The World's Fastest Human Translation Platform.
>>>>> oytun@motaword.com — www.motaword.com
>>>>>
>>>>> --
>> Best,
>> Guowei
>>
>>