You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dawid Wysakowicz <dw...@apache.org> on 2020/10/07 06:53:35 UTC

Re: Live updating Serialization Schemas in Flink

Hi,

Unfortunately I don't have a nice solution for you. I would also
generally discourage such a pattern. Usually how multiple/dynamic
schemas are used is with a help of schema registry. In that case you
have some sort of an id serialized along with records which you can use
to look up the schema. In that scenario you don't need to poll for
schemas, which can easily go out of sync or may have all sorts of
problems you described.

Best of luck with figuring out your problem.

Best,

Dawid

On 24/09/2020 23:37, Hunter Herman wrote:
>
> Hi flink-users! I need advice about how to tackle a programming
> problem I’m facing. I have a bunch of jobs that look something like
> this sketch
>
> Source<GenericRecord> kafkaSource;
>
> kafkaSource
>
>     .map(function that takes generic record)
>
>     .map( ... )
>
>     ...
>
>     .sink(kafka sink that takes in generic records)
>
> The reason we represent data as GenericRecords is that the Avro
> schemas that are in play vary during runtime. We write schema
> descriptions to a separate topic. We're aware of the performance
> penalty of passing GenericRecords to/from operators/kafka, so we wrote
> our own Kafka serialization schema and Kryo serializer for
> GenericRecords. The tricky part is our custom serializer needs to know
> what the current list of schemas is so it can figure out how to ser/de
> messages as they pass through the graph.
>
> I can't for the life of me figure out how to pass this info into our
> serializers in a sane way. The methods I'm aware of are:
>
> 1.      A static field somewhere that polls an external system for the
> list of records. We already do this but we think it will cause class
> loader leaks, since the polling thread is created inside the custom
> serializer, and its not clear where we should cancel it.
>
> 2.      Broadcast state. We could try and stream the schemas around
> our graph using broadcast state, but that means its going to be much
> less ergonomic to write these jobs; every single operator in the graph
> will have to receive the broadcast state, and will have to handle
> serialization internally instead of using our custom serializer
>
> 3.      A static field which is populated by the kafka stream. This
> avoids the thread leak of (1), but I don't think flink guarantees we
> can get this static field populated in every task slot. Its hard to
> control where the kafka stream will be processed.
>
> I know its a complicated situation, so I hope it came across clearly.
> I feel pretty stumped, as none of the solutions I've considered seem
> adequate. Is there another option I haven't thought of? Is there a
> better way to manage a dynamic set of Avro schemas without restarting?
> Would love any advice! Thanks!
>
>  
>
> SO:
> https://stackoverflow.com/questions/64054753/live-updating-serialization-schemas-in-flink
>
>