You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Oran Shuster <or...@houzz.com> on 2022/01/25 11:37:51 UTC

Example for Jackson JsonNode Kafka serialization schema

In the documentation we have an example on how to implement deserialization
from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

However, there is no example on the other direction: Taking an
ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
it to string

You can write a simple schema like so


public class JSONKafkaSerializationSchema implements
KafkaSerializationSchema<JsonNode> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ProducerRecord<byte[], byte[]> serialize(JsonNode element,
@Nullable Long timestamp) {
        String topic = getTargetTopic(element);

        byte[] value;

        try {
            value = objectMapper.writeValueAsBytes(element);
            return new ProducerRecord<>(topic, value);
        } catch (JsonProcessingException e) {
            return null;
        }
    }

    private String getTargetTopic(JsonNode jsonNode) {
        return jsonNode.get("topic").asText();
    }
}

But this raises a question - What to do when a serialization fails?
if the input class is a simple POJO then Jackson should always succeed in
converting to bytes but that's not 100% guaranteed.
In case of failures, can we return null and the record will be discarded?
Null values are discarded in the case of the deserialization schema, from
the documentation - "Returns: The deserialized message as an object (null
if the message cannot be deserialized)."
If this is not possible, what is the proper way to serialize Jackson objets
into bytes in flink? Its possible to convert everything to String before
the kafka producer but then any logic to determine the topic we need to
send to will need to deserialize the string again

Re: Example for Jackson JsonNode Kafka serialization schema

Posted by Robert Metzger <me...@gmail.com>.
Hi Oran,

as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, ...


On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster <or...@houzz.com>
wrote:

> In the documentation we have an example on how to implement
> deserialization from bytes to Jackson ObjectNode objects
> - JSONKeyValueDeserializationSchema
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
>
> However, there is no example on the other direction: Taking an
> ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
> it to string
>
> You can write a simple schema like so
>
>
> public class JSONKafkaSerializationSchema implements
> KafkaSerializationSchema<JsonNode> {
>     private final ObjectMapper objectMapper = new ObjectMapper();
>
>     @Override
>     public ProducerRecord<byte[], byte[]> serialize(JsonNode element,
> @Nullable Long timestamp) {
>         String topic = getTargetTopic(element);
>
>         byte[] value;
>
>         try {
>             value = objectMapper.writeValueAsBytes(element);
>             return new ProducerRecord<>(topic, value);
>         } catch (JsonProcessingException e) {
>             return null;
>         }
>     }
>
>     private String getTargetTopic(JsonNode jsonNode) {
>         return jsonNode.get("topic").asText();
>     }
> }
>
> But this raises a question - What to do when a serialization fails?
> if the input class is a simple POJO then Jackson should always succeed in
> converting to bytes but that's not 100% guaranteed.
> In case of failures, can we return null and the record will be discarded?
> Null values are discarded in the case of the deserialization schema, from
> the documentation - "Returns: The deserialized message as an object (null
> if the message cannot be deserialized)."
> If this is not possible, what is the proper way to serialize Jackson
> objets into bytes in flink? Its possible to convert everything to String
> before the kafka producer but then any logic to determine the topic we need
> to send to will need to deserialize the string again
>