You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax (JIRA)" <ji...@apache.org> on 2017/01/25 18:00:32 UTC

[jira] [Commented] (KAFKA-4691) ProducerInterceptor.onSend() is called after key and value are serialized

    [ https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838245#comment-15838245 ] 

Matthias J. Sax commented on KAFKA-4691:
----------------------------------------

I am not sure, if we can fix this soon. The problem is, that Streams uses a single `KafkaProducer` for all output topics. All those output topics might have different types for keys and values while `KafkaProducer` can only be configures to use a single serializer; one for the key and one for the value. Thus, Kafka Streams configures the producer to use {{byte[]}} as key and value type and use the appropriate serializers per output topic before handing the record to the producer. Right now, I would rather close this as "won't fix". \cc [~guozhang]

> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4691
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4691
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 0.10.1.1
>            Reporter: Francesco Lemma
>              Labels: easyfix
>         Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) "	This is called from KafkaProducer.send(ProducerRecord) and KafkaProducer.send(ProducerRecord, Callback) methods, before key and value get serialized and partition is assigned (if partition is not specified in ProducerRecord)".
> Although when using this with Kafka Streams (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
>                             StreamPartitioner<K, V> partitioner), effectively before calling the send method of the producer which will trigger the interceptor.
> This makes it unable to perform any kind of operation involving the key or value of the message, unless at least performing an additional deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)