You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xin Li <xi...@shopback.com> on 2022/03/08 07:33:03 UTC

Flink Statefun Kafka Ingress Record Key Deserializer

*Hello Flink Team,*

I am right now using Flink stateful function in my project, which are
consuming avro serialized events(both key and value are serialized) from
kafka, but it seems there is no configuration that users can customize for
deserializing the kafka record's key, because I noticed that the key
deserializer is fixed to be a UTF-8 String Deserializer here:
RoutableKafkaIngressDeserializer.java
<https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java>
.

As a result, the deserialized key becomes chaos code, then incorrect hash
values will be generated based on these chaos codes, which leads to highly
possibly uneven record distribution and is prone to cause data skew.

I wonder if the community will consider adding a configuration for users to
customize the deserializer in the Flink stateful function kafka ingress ?

Looking forward to hearing from you

Best regards

*Xin Li*

Re: Flink Statefun Kafka Ingress Record Key Deserializer

Posted by Igal Shilman <ig...@gmail.com>.
 Hello Xin Li,

Indeed the built in ingress that ships with StateFun requires that the key
part will be a utf-8 string, This string then becomes the id part of the
target address.
StateFun is extensible via the StatefulFunctionModule[1] and customizing
the Kafka ingress is also possible, take a look here[2]

Unfortunately a more generic solution is not currently available.
Previously we had an xpath based solution for Protobuf, but it wasn't used
by the community.

If you are interested in working on a contribution along similar lines (a
very simple xpath like key extraction) I'd be happy to guide you through.

Thanks,
Igal.


[1]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-embedded%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstatefun%2Fsdk%2Fspi%2FStatefulFunctionModule.java
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-kafka-io%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstatefun%2Fsdk%2Fkafka%2FKafkaIngressBuilder.java#L107

On Tue, Mar 8, 2022 at 8:33 AM Xin Li <xi...@shopback.com> wrote:

> *Hello Flink Team,*
>
> I am right now using Flink stateful function in my project, which are
> consuming avro serialized events(both key and value are serialized) from
> kafka, but it seems there is no configuration that users can customize for
> deserializing the kafka record's key, because I noticed that the key
> deserializer is fixed to be a UTF-8 String Deserializer here:
> RoutableKafkaIngressDeserializer.java
> <https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/binders/ingress/v1/RoutableKafkaIngressDeserializer.java>
> .
>
> As a result, the deserialized key becomes chaos code, then incorrect hash
> values will be generated based on these chaos codes, which leads to highly
> possibly uneven record distribution and is prone to cause data skew.
>
> I wonder if the community will consider adding a configuration for users
> to customize the deserializer in the Flink stateful function kafka ingress ?
>
> Looking forward to hearing from you
>
> Best regards
>
> *Xin Li*
>
>