You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dominik Safaric <do...@gmail.com> on 2016/11/01 12:17:03 UTC

Modify in-flight messages

Is it possible to somehow modify the Kafka message payload before being sent to the consumer for consumption? Such as for example adding a timestamp to the current message payload indicating the time of message consumption. 

Dominik Šafarić

Re: Modify in-flight messages

Posted by Eno Thereska <en...@gmail.com>.
Hi Dominik,

Not sure if this is 100% relevant, but since I noticed you saying that you are benchmarking stream processing engines, one way to modify a message would be to use the Kafka Streams library, where you consume a message from a topic, modify it as needed/do some processing, and then produce further to another topic.

Thanks
Eno

> On 1 Nov 2016, at 15:46, Dominik Safaric <do...@gmail.com> wrote:
> 
> Dear Michael,
> 
> Thanks for sharing this information with me. 
> 
> I am aware of the fact that each message has a timestamp indicating either the log append or creation time. 
> 
> But in my case, this is not enough since I want to derive the consumption time of messages. The reason for this is because we are currently in the process of an empirical research benchmarking several stream processing engines. 
> 
> Anyhow, I’ll take a look at the intercept classes.
> 
> Dominik
> 
>> On 1 Nov 2016, at 14:40, Mickael Maison <mi...@gmail.com> wrote:
>> 
>> Hi Dominik,
>> 
>> On both the consumer and producer you can use the
>> "interceptor.classes" config to specify classes that intercept and can
>> modify records when they are sent/received.
>> Also as of Kafka 0.10, messages have a timestamp field. See
>> http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#timestamp().
>> 
>> On Tue, Nov 1, 2016 at 12:17 PM, Dominik Safaric
>> <do...@gmail.com> wrote:
>>> Is it possible to somehow modify the Kafka message payload before being sent to the consumer for consumption? Such as for example adding a timestamp to the current message payload indicating the time of message consumption.
>>> 
>>> Dominik Šafarić
> 


Re: Modify in-flight messages

Posted by Dominik Safaric <do...@gmail.com>.
Dear Michael,

Thanks for sharing this information with me. 

I am aware of the fact that each message has a timestamp indicating either the log append or creation time. 

But in my case, this is not enough since I want to derive the consumption time of messages. The reason for this is because we are currently in the process of an empirical research benchmarking several stream processing engines. 

Anyhow, I’ll take a look at the intercept classes.

Dominik

> On 1 Nov 2016, at 14:40, Mickael Maison <mi...@gmail.com> wrote:
> 
> Hi Dominik,
> 
> On both the consumer and producer you can use the
> "interceptor.classes" config to specify classes that intercept and can
> modify records when they are sent/received.
> Also as of Kafka 0.10, messages have a timestamp field. See
> http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#timestamp().
> 
> On Tue, Nov 1, 2016 at 12:17 PM, Dominik Safaric
> <do...@gmail.com> wrote:
>> Is it possible to somehow modify the Kafka message payload before being sent to the consumer for consumption? Such as for example adding a timestamp to the current message payload indicating the time of message consumption.
>> 
>> Dominik Šafarić


Re: Modify in-flight messages

Posted by Mickael Maison <mi...@gmail.com>.
Hi Dominik,

On both the consumer and producer you can use the
"interceptor.classes" config to specify classes that intercept and can
modify records when they are sent/received.
Also as of Kafka 0.10, messages have a timestamp field. See
http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html#timestamp().

On Tue, Nov 1, 2016 at 12:17 PM, Dominik Safaric
<do...@gmail.com> wrote:
> Is it possible to somehow modify the Kafka message payload before being sent to the consumer for consumption? Such as for example adding a timestamp to the current message payload indicating the time of message consumption.
>
> Dominik Šafarić