You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Timur Yusupov <tt...@gmail.com> on 2017/04/27 15:52:18 UTC

KafkaStreams pause specific topic partition consumption

I see it is possible to pause specific topic partition consumption when
using KafkaConsumer directly, but looks like it is not possible when using
KafkaStreams.

There are following use cases for that:
1) Doing batch processing using Kafka Streams (I found
https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
proposal for Kafka Streams, but according to
https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
development on that side). So we are considering to just pause specific
topic partitions as soon as we arrive to stop offsets for them.

2) Assume we process multiple topics in some parallel way and want to pause
some topics while waiting for other topics to catch up.

Actually, the first use case is more important for us, so would be good to
know if there is a possibility or some improvements are already planned for
allowing to pause specific topic partition consumption in KafkaStream.

-- 
Best Regards, Timur.

Re: KafkaStreams pause specific topic partition consumption

Posted by Timur Yusupov <tt...@gmail.com>.
Got it, thanks, Matthias!

On Tue, May 9, 2017 at 2:07 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Yes. That is something you would need to do external too.
>
> There is a KIP for a tool
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 122%3A+Add+Reset+Consumer+Group+Offsets+tooling)
> -- but you can also do this using a single `KafkaConsumer` with
> `group.id == application.id` that gets all partitions assigned and does
> the corresponding seek plus commitSync.
>
> Note, you need to make sure that Streams "consumer group" is completely
> inactive to avoid conflict. To could add a check similar to
> https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/tools/StreamsResetter.java#L94
>
>
> -Matthias
>
> On 5/8/17 4:01 PM, Timur Yusupov wrote:
> > That means in order to process filtered out records in a next batch, we
> > have to seek KafkaStreams back, right?
> >
> > On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> I see.
> >>
> >> I you do the step of storing the end offsets in your database before
> >> starting up Streams this would work.
> >>
> >> What you could do as a work around (even if it might not be a nice
> >> solution), is to apply a `transform()` as your first operator. Within
> >> `transfrom()` you get access to there current record offset via
> >> `context.offset` (context object is provided via `init()`). Thus, you
> >> can implement an "offset filter" and also track if all partitions did
> >> reach their end offset (you also get a records partitions via context).
> >>
> >> Thus, if one record is after the partition end-offset, you just filter
> >> the record out. If all partitions did reach end-offset, you can set a
> >> flag to notify you "main" thread to close() Kafka Streams instances.
> >>
> >> Does this make sense?
> >>
> >>
> >> -Matthias
> >>
> >> On 5/8/17 12:49 PM, Timur Yusupov wrote:
> >>> Matthias,
> >>>
> >>> Thanks for your answers.
> >>>
> >>>>> So we are considering to just pause specific
> >>>>> topic partitions as soon as we arrive to stop offsets for them.
> >>>> I am just wondering how you would do this in a fault-tolerant way (if
> >> you
> >>> would have pause API)?
> >>> On start of batch cycle we have to store somewhere (for our use case
> >>> database we already use will work) end offsets for topic partitions we
> >> are
> >>> interested in. Then we just need to process all messages up to stored
> end
> >>> offsets. In case application is restarted - it first checks database
> for
> >>> stored end offsets.
> >>>
> >>>>> 2) Assume we process multiple topics in some parallel way and want to
> >>> pause
> >>>>> some topics while waiting for other topics to catch up.
> >>>> Streams synchronizes topics on time automatically for your. So I am
> >>> wondering why this does not work for you?
> >>> Right, this is probably a bad example, but use case 1) with batch
> >>> processing is still relevant.
> >>>
> >>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
> >>>>> I see it is possible to pause specific topic partition consumption
> when
> >>>>> using KafkaConsumer directly, but looks like it is not possible when
> >>>> using
> >>>>> KafkaStreams.
> >>>>>
> >>>>> There are following use cases for that:
> >>>>> 1) Doing batch processing using Kafka Streams (I found
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>>>> proposal for Kafka Streams, but according to
> >>>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> >>>>> development on that side). So we are considering to just pause
> specific
> >>>>> topic partitions as soon as we arrive to stop offsets for them.
> >>>>>
> >>>>> 2) Assume we process multiple topics in some parallel way and want to
> >>>> pause
> >>>>> some topics while waiting for other topics to catch up.
> >>>>>
> >>>>> Actually, the first use case is more important for us, so would be
> good
> >>>> to
> >>>>> know if there is a possibility or some improvements are already
> planned
> >>>> for
> >>>>> allowing to pause specific topic partition consumption in
> KafkaStream.
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >
> >
>
>

Re: KafkaStreams pause specific topic partition consumption

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Yes. That is something you would need to do external too.

There is a KIP for a tool
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling)
-- but you can also do this using a single `KafkaConsumer` with
`group.id == application.id` that gets all partitions assigned and does
the corresponding seek plus commitSync.

Note, you need to make sure that Streams "consumer group" is completely
inactive to avoid conflict. To could add a check similar to
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/StreamsResetter.java#L94


-Matthias

On 5/8/17 4:01 PM, Timur Yusupov wrote:
> That means in order to process filtered out records in a next batch, we
> have to seek KafkaStreams back, right?
> 
> On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I see.
>>
>> I you do the step of storing the end offsets in your database before
>> starting up Streams this would work.
>>
>> What you could do as a work around (even if it might not be a nice
>> solution), is to apply a `transform()` as your first operator. Within
>> `transfrom()` you get access to there current record offset via
>> `context.offset` (context object is provided via `init()`). Thus, you
>> can implement an "offset filter" and also track if all partitions did
>> reach their end offset (you also get a records partitions via context).
>>
>> Thus, if one record is after the partition end-offset, you just filter
>> the record out. If all partitions did reach end-offset, you can set a
>> flag to notify you "main" thread to close() Kafka Streams instances.
>>
>> Does this make sense?
>>
>>
>> -Matthias
>>
>> On 5/8/17 12:49 PM, Timur Yusupov wrote:
>>> Matthias,
>>>
>>> Thanks for your answers.
>>>
>>>>> So we are considering to just pause specific
>>>>> topic partitions as soon as we arrive to stop offsets for them.
>>>> I am just wondering how you would do this in a fault-tolerant way (if
>> you
>>> would have pause API)?
>>> On start of batch cycle we have to store somewhere (for our use case
>>> database we already use will work) end offsets for topic partitions we
>> are
>>> interested in. Then we just need to process all messages up to stored end
>>> offsets. In case application is restarted - it first checks database for
>>> stored end offsets.
>>>
>>>>> 2) Assume we process multiple topics in some parallel way and want to
>>> pause
>>>>> some topics while waiting for other topics to catch up.
>>>> Streams synchronizes topics on time automatically for your. So I am
>>> wondering why this does not work for you?
>>> Right, this is probably a bad example, but use case 1) with batch
>>> processing is still relevant.
>>>
>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
>>>>> I see it is possible to pause specific topic partition consumption when
>>>>> using KafkaConsumer directly, but looks like it is not possible when
>>>> using
>>>>> KafkaStreams.
>>>>>
>>>>> There are following use cases for that:
>>>>> 1) Doing batch processing using Kafka Streams (I found
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>>>> proposal for Kafka Streams, but according to
>>>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
>>>>> development on that side). So we are considering to just pause specific
>>>>> topic partitions as soon as we arrive to stop offsets for them.
>>>>>
>>>>> 2) Assume we process multiple topics in some parallel way and want to
>>>> pause
>>>>> some topics while waiting for other topics to catch up.
>>>>>
>>>>> Actually, the first use case is more important for us, so would be good
>>>> to
>>>>> know if there is a possibility or some improvements are already planned
>>>> for
>>>>> allowing to pause specific topic partition consumption in KafkaStream.
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
> 
> 


Re: KafkaStreams pause specific topic partition consumption

Posted by Timur Yusupov <tt...@gmail.com>.
That means in order to process filtered out records in a next batch, we
have to seek KafkaStreams back, right?

On Tue, May 9, 2017 at 1:19 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I see.
>
> I you do the step of storing the end offsets in your database before
> starting up Streams this would work.
>
> What you could do as a work around (even if it might not be a nice
> solution), is to apply a `transform()` as your first operator. Within
> `transfrom()` you get access to there current record offset via
> `context.offset` (context object is provided via `init()`). Thus, you
> can implement an "offset filter" and also track if all partitions did
> reach their end offset (you also get a records partitions via context).
>
> Thus, if one record is after the partition end-offset, you just filter
> the record out. If all partitions did reach end-offset, you can set a
> flag to notify you "main" thread to close() Kafka Streams instances.
>
> Does this make sense?
>
>
> -Matthias
>
> On 5/8/17 12:49 PM, Timur Yusupov wrote:
> > Matthias,
> >
> > Thanks for your answers.
> >
> >>> So we are considering to just pause specific
> >>> topic partitions as soon as we arrive to stop offsets for them.
> >> I am just wondering how you would do this in a fault-tolerant way (if
> you
> > would have pause API)?
> > On start of batch cycle we have to store somewhere (for our use case
> > database we already use will work) end offsets for topic partitions we
> are
> > interested in. Then we just need to process all messages up to stored end
> > offsets. In case application is restarted - it first checks database for
> > stored end offsets.
> >
> >>> 2) Assume we process multiple topics in some parallel way and want to
> > pause
> >>> some topics while waiting for other topics to catch up.
> >> Streams synchronizes topics on time automatically for your. So I am
> > wondering why this does not work for you?
> > Right, this is probably a bad example, but use case 1) with batch
> > processing is still relevant.
> >
> >
> >>
> >> -Matthias
> >>
> >>
> >> On 4/27/17 8:52 AM, Timur Yusupov wrote:
> >>> I see it is possible to pause specific topic partition consumption when
> >>> using KafkaConsumer directly, but looks like it is not possible when
> >> using
> >>> KafkaStreams.
> >>>
> >>> There are following use cases for that:
> >>> 1) Doing batch processing using Kafka Streams (I found
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>> proposal for Kafka Streams, but according to
> >>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> >>> development on that side). So we are considering to just pause specific
> >>> topic partitions as soon as we arrive to stop offsets for them.
> >>>
> >>> 2) Assume we process multiple topics in some parallel way and want to
> >> pause
> >>> some topics while waiting for other topics to catch up.
> >>>
> >>> Actually, the first use case is more important for us, so would be good
> >> to
> >>> know if there is a possibility or some improvements are already planned
> >> for
> >>> allowing to pause specific topic partition consumption in KafkaStream.
> >>>
> >>
> >>
> >
> >
>
>


-- 
С наилучшими пожеланиями, Тимур.

Re: KafkaStreams pause specific topic partition consumption

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I see.

I you do the step of storing the end offsets in your database before
starting up Streams this would work.

What you could do as a work around (even if it might not be a nice
solution), is to apply a `transform()` as your first operator. Within
`transfrom()` you get access to there current record offset via
`context.offset` (context object is provided via `init()`). Thus, you
can implement an "offset filter" and also track if all partitions did
reach their end offset (you also get a records partitions via context).

Thus, if one record is after the partition end-offset, you just filter
the record out. If all partitions did reach end-offset, you can set a
flag to notify you "main" thread to close() Kafka Streams instances.

Does this make sense?


-Matthias

On 5/8/17 12:49 PM, Timur Yusupov wrote:
> Matthias,
> 
> Thanks for your answers.
> 
>>> So we are considering to just pause specific
>>> topic partitions as soon as we arrive to stop offsets for them.
>> I am just wondering how you would do this in a fault-tolerant way (if you
> would have pause API)?
> On start of batch cycle we have to store somewhere (for our use case
> database we already use will work) end offsets for topic partitions we are
> interested in. Then we just need to process all messages up to stored end
> offsets. In case application is restarted - it first checks database for
> stored end offsets.
> 
>>> 2) Assume we process multiple topics in some parallel way and want to
> pause
>>> some topics while waiting for other topics to catch up.
>> Streams synchronizes topics on time automatically for your. So I am
> wondering why this does not work for you?
> Right, this is probably a bad example, but use case 1) with batch
> processing is still relevant.
> 
> 
>>
>> -Matthias
>>
>>
>> On 4/27/17 8:52 AM, Timur Yusupov wrote:
>>> I see it is possible to pause specific topic partition consumption when
>>> using KafkaConsumer directly, but looks like it is not possible when
>> using
>>> KafkaStreams.
>>>
>>> There are following use cases for that:
>>> 1) Doing batch processing using Kafka Streams (I found
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>> proposal for Kafka Streams, but according to
>>> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
>>> development on that side). So we are considering to just pause specific
>>> topic partitions as soon as we arrive to stop offsets for them.
>>>
>>> 2) Assume we process multiple topics in some parallel way and want to
>> pause
>>> some topics while waiting for other topics to catch up.
>>>
>>> Actually, the first use case is more important for us, so would be good
>> to
>>> know if there is a possibility or some improvements are already planned
>> for
>>> allowing to pause specific topic partition consumption in KafkaStream.
>>>
>>
>>
> 
> 


Re: KafkaStreams pause specific topic partition consumption

Posted by Timur Yusupov <tt...@gmail.com>.
Matthias,

Thanks for your answers.

>> So we are considering to just pause specific
>> topic partitions as soon as we arrive to stop offsets for them.
>I am just wondering how you would do this in a fault-tolerant way (if you
would have pause API)?
On start of batch cycle we have to store somewhere (for our use case
database we already use will work) end offsets for topic partitions we are
interested in. Then we just need to process all messages up to stored end
offsets. In case application is restarted - it first checks database for
stored end offsets.

>> 2) Assume we process multiple topics in some parallel way and want to
pause
>> some topics while waiting for other topics to catch up.
> Streams synchronizes topics on time automatically for your. So I am
wondering why this does not work for you?
Right, this is probably a bad example, but use case 1) with batch
processing is still relevant.


>
> -Matthias
>
>
> On 4/27/17 8:52 AM, Timur Yusupov wrote:
> > I see it is possible to pause specific topic partition consumption when
> > using KafkaConsumer directly, but looks like it is not possible when
> using
> > KafkaStreams.
> >
> > There are following use cases for that:
> > 1) Doing batch processing using Kafka Streams (I found
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> > proposal for Kafka Streams, but according to
> > https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> > development on that side). So we are considering to just pause specific
> > topic partitions as soon as we arrive to stop offsets for them.
> >
> > 2) Assume we process multiple topics in some parallel way and want to
> pause
> > some topics while waiting for other topics to catch up.
> >
> > Actually, the first use case is more important for us, so would be good
> to
> > know if there is a possibility or some improvements are already planned
> for
> > allowing to pause specific topic partition consumption in KafkaStream.
> >
>
>


-- 
С наилучшими пожеланиями, Тимур.

Re: KafkaStreams pause specific topic partition consumption

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Timur,

there is not API to pause/resume partitions in Streams, because Streams
handles/manages its internal consumer by itself.

The "batch processing KIP" is currently delayed -- but I am sure we will
pick it up again. Hopefully after 0.11 got released.

> So we are considering to just pause specific
>> topic partitions as soon as we arrive to stop offsets for them.

I am just wondering how you would do this in a fault-tolerant way (if
you would have pause API)?

>> 2) Assume we process multiple topics in some parallel way and want to pause
>> some topics while waiting for other topics to catch up.

Streams synchronizes topics on time automatically for your. So I am
wondering why this does not work for you?


-Matthias


On 4/27/17 8:52 AM, Timur Yusupov wrote:
> I see it is possible to pause specific topic partition consumption when
> using KafkaConsumer directly, but looks like it is not possible when using
> KafkaStreams.
> 
> There are following use cases for that:
> 1) Doing batch processing using Kafka Streams (I found
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> proposal for Kafka Streams, but according to
> https://issues.apache.org/jira/browse/KAFKA-4437 there is no active
> development on that side). So we are considering to just pause specific
> topic partitions as soon as we arrive to stop offsets for them.
> 
> 2) Assume we process multiple topics in some parallel way and want to pause
> some topics while waiting for other topics to catch up.
> 
> Actually, the first use case is more important for us, so would be good to
> know if there is a possibility or some improvements are already planned for
> allowing to pause specific topic partition consumption in KafkaStream.
>