You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marcos Juarez <mj...@gmail.com> on 2017/04/13 23:03:37 UTC

Delayed consumer in a Streams topology

I'm building a prototype with Kafka Streams that will be consuming from the
same topic twice, once with no delay, just like any normal consumer, and
once with a 60 minute delay, using the new timestamp-per-message field.  It
will also store state coming from other topics that are being read
simultaneously.

The reason why I'm consuming twice from the same topic, with one of them
delayed, is that our processor needs to know, for any particular record, if
there are any more records related to that coming within the next 60
minutes, and change some of the fields accordingly, before sending them
down as the final version of that event.

Everything seems to be supported by the normal Streams DSL, except for a
way to specify a consumption delay from a particular source topic.

I know I can just create a delayed topic myself, and then consume from
that, but the topic volume in this case prevents us from doing this.  I
don't want to duplicate the data and load in the cluster.  This topic
currently handles ~2B records per day, and will grow in the future to
potentially 10B records per day.

Any ideas on how I could handle this using Kafka Streams?  Or if it's
better to use the lower level Streams API for that, can you point me to a
starting point?  I've been reading docs and javadocs for a while now, and
I'm not sure where I would add/configure this.

Thanks!

Marcos Juarez

Re: Delayed consumer in a Streams topology

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

reading a topic twice -- what it the first requirement you have -- is
not possible (and not necessary IMHO) with Streams API -- regardless of
a "delayed" read. The reason is, that Streams uses a single consumer
group.id internally and thus, Streams can commit only one offset per
topic-partitions, but reading a topic twice would require to commit two
offsets.

But as mentioned already, I don't think you need to read the data twice
anyway. You can just read it once, and buffer all data from the last 60
minutes within your application.

I am not sure, how you identify "related" records. It would be good to
understand some more details to give better help. Maybe you can use
window-aggregation or a windowed-join.

In any case, you can fall back to lower-lever Processor API -- it gives
you full control and allows you to express any logic you need.

Hope this helps.

-Matthias

On 4/13/17 4:03 PM, Marcos Juarez wrote:
> I'm building a prototype with Kafka Streams that will be consuming from the
> same topic twice, once with no delay, just like any normal consumer, and
> once with a 60 minute delay, using the new timestamp-per-message field.  It
> will also store state coming from other topics that are being read
> simultaneously.
> 
> The reason why I'm consuming twice from the same topic, with one of them
> delayed, is that our processor needs to know, for any particular record, if
> there are any more records related to that coming within the next 60
> minutes, and change some of the fields accordingly, before sending them
> down as the final version of that event.
> 
> Everything seems to be supported by the normal Streams DSL, except for a
> way to specify a consumption delay from a particular source topic.
> 
> I know I can just create a delayed topic myself, and then consume from
> that, but the topic volume in this case prevents us from doing this.  I
> don't want to duplicate the data and load in the cluster.  This topic
> currently handles ~2B records per day, and will grow in the future to
> potentially 10B records per day.
> 
> Any ideas on how I could handle this using Kafka Streams?  Or if it's
> better to use the lower level Streams API for that, can you point me to a
> starting point?  I've been reading docs and javadocs for a while now, and
> I'm not sure where I would add/configure this.
> 
> Thanks!
> 
> Marcos Juarez
>