You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Schrott <pe...@bluerootlabs.io> on 2023/01/24 10:52:52 UTC

Problem with custom SerializationSchema in Flink 1.15

Hi Flink-User!


I recently updated a Flink job from Flink version 1.13 to 1.15 (managed by AWS). The Flink Job is written in Java.

I found out that the Kinesis Producer was deprecated in favour of Kinesis Streams Sink [1]. When upgrading to the new sink I stumbled upon a problem withe a custom Serialisation Schema. I am using a custom implementation of the Serialisation Schema to deserialize result POJOs to JSON using Jacksons Object Mapper. This Object Mapper is initialised and set up in the open() method of the Serialisation Schema. The problem is, that this open method is not call intially.

I have not found any but report or indications towards this issue. Is this known or am I just “holding it wrong” (aka missing something)?

I created a minimal reproducible on my GitHub repo: https://github.com/peterschrott/flink-sink-open


Best & Thanks,
Peter


[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/#kinesis-producer

Re: Problem with custom SerializationSchema in Flink 1.15

Posted by Chesnay Schepler <ch...@apache.org>.
It's a known issue that various connectors/wrappers/etc did not respect 
the schema lifecycle.

This was fixed in 1.16.0 in 
https://issues.apache.org/jira/browse/FLINK-28807.

You will have to lazily initialize the mapper in the serialize() method 
for previous versions.

On 24/01/2023 11:52, Peter Schrott wrote:
> Hi Flink-User!
>
>
> I recently updated a Flink job from Flink version 1.13 to 1.15 
> (managed by AWS). The Flink Job is written in Java.
>
> I found out that the Kinesis Producer was deprecated in favour of 
> Kinesis Streams Sink [1]. When upgrading to the new sink I stumbled 
> upon a problem withe a custom Serialisation Schema. I am using a 
> custom implementation of the Serialisation Schema to deserialize 
> result POJOs to JSON using Jacksons Object Mapper. This Object Mapper 
> is initialised and set up in the open() method of the Serialisation 
> Schema. The problem is, that this open method is not call intially.
>
> I have not found any but report or indications towards this issue. Is 
> this known or am I just “holding it wrong” (aka missing something)?
>
> I created a minimal reproducible on my GitHub repo: 
> https://github.com/peterschrott/flink-sink-open
>
>
> Best & Thanks,
> Peter
>
>
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/#kinesis-producer


Re: Problem with custom SerializationSchema in Flink 1.15

Posted by Peter Schrott <pe...@bluerootlabs.io>.
Hi André,

thanks for the answer. 

Unfortunately the removal of the super call does not fix the issue. 

From the code I can see, that the open() method is only called on the legacy FlinkKinesisProducer [1]. For that reason I run into the same issue when I am using the new FlinkFirehoseSink. Both have the the base class AsyncSinkBase.

I will dig deeper into the example you pointed out. 

Best, Peter

[1]


> On 24. Jan 2023, at 12:16, André Midea Jasiskis <mi...@gmail.com> wrote:
> 
> Hi Peter,
> 
> Have you tried removing the super() call? 
> 
> An example here of how this interface is used in flink for a similar goal. https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java#L88
> 
> Best,
> Andre
> 
> On Tue, Jan 24, 2023 at 11:53 AM Peter Schrott <peter@bluerootlabs.io <ma...@bluerootlabs.io>> wrote:
>> Hi Flink-User!
>> 
>> 
>> I recently updated a Flink job from Flink version 1.13 to 1.15 (managed by AWS). The Flink Job is written in Java.
>> 
>> I found out that the Kinesis Producer was deprecated in favour of Kinesis Streams Sink [1]. When upgrading to the new sink I stumbled upon a problem withe a custom Serialisation Schema. I am using a custom implementation of the Serialisation Schema to deserialize result POJOs to JSON using Jacksons Object Mapper. This Object Mapper is initialised and set up in the open() method of the Serialisation Schema. The problem is, that this open method is not call intially.
>> 
>> I have not found any but report or indications towards this issue. Is this known or am I just “holding it wrong” (aka missing something)?
>> 
>> I created a minimal reproducible on my GitHub repo: https://github.com/peterschrott/flink-sink-open
>> 
>> 
>> Best & Thanks,
>> Peter
>> 
>> 
>> [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/#kinesis-producer