You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christopher Gustafson <ch...@kth.se> on 2022/03/16 13:59:19 UTC

Adding a custom Kafka deserializer to Statefun Job

Hi,


I am writing a StateFun application using remote functions, where I want to include a custom Kafka deserializer that adds the timestamp of the Kafka ingress messages to the messages sent between my remote functions. I can't seem to find a solution to this using remote functions, as the current option using Custom Types only has examples that look at the value of the incoming Kafka messages and not the other metadata.


I have been looking at the following example:

 public static final Type<User> TYPE = SimpleType.simpleImmutableTypeFrom(
            TypeName.typeNameFromString("com.example/User"),
            mapper:writeValueAsBytes,
    bytes ->mapper.readValue(bytes,User .class));


But instead of just deserializing by reading the value, I want to use my own KafkaIngressDeserializer that also adds the timestamp to the Object being returned. Is there a way of doing so?


Best Regards,

Christopher Gustafson