You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by pradeep s <sr...@gmail.com> on 2018/10/24 05:33:18 UTC

Consumer Pause & Scheduled Resume

Hi,
I have a requirement to have kafka streaming start at scheduled time and
then pause the stream when the consumer poll returns empty fetches for 3 or
more polls.

I am starting a consumer poll loop during application startup using a
singled thread executor and then pausing the consumer when the poll is
returning empty for 3 polls.

When the schedule kicks in , i am calling *consumer.resume.*

Is this approach correct ?
Will it cause any issue If the  consumer calls poll on a paused consumer.

Skeleton Code
============

public class *OfferItemImageConsumer* implements Runnable {

@Override
public void run() {
    try {
        do {
            ConsumerRecords<String, String> records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
            writeAndPauseEmptyFetch(records);
            processRecords(records);
        } while (!consumerLoopClosed.get());
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }
}


private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) {
    if (records.isEmpty()) {
        emptyFetchCount++;
    }
    if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
        writeImageData();
        emptyFetchCount = 0;
        kafkaConsumer.pause(kafkaConsumer.assignment());
        consumerPaused = true;
    }
}

}

=================================

public class *ItemImageStreamScheduler* {
    private static final int TERMINATION_TIMEOUT = 10;


    private ExecutorService executorService =
Executors.newSingleThreadExecutor();

    private final OfferItemImageConsumer offerItemImageConsumer;
    private final ItemImageStreamConfig itemImageStreamConfig;
    private final KafkaConsumer<String, String> kafkaConsumer;

    @EventListener(ApplicationReadyEvent.class)
    void startStreaming() {
        executorService.submit(offerItemImageConsumer);
    }
    @Scheduled
    void resumeStreaming() {
        kafkaConsumer.resume(kafkaConsumer.assignment());
    }


}

Thanks

Pradeep

Re: Consumer Pause & Scheduled Resume

Posted by pradeep s <sr...@gmail.com>.
Code Snippet Without continuous polling
==================================
public class OfferItemImageScheduler {

@Scheduled(cron = "0 0/2 * * * ?")

void startStreaming() {
    kafkaConsumer.resume(kafkaConsumer.assignment());
    offerItemImageConsumer.streamMessages(kafkaConsumer);
    kafkaConsumer.pause(kafkaConsumer.assignment());

}

}

=========================

public class OfferItemImageConsumer {

public boolean streamMessages(KafkaConsumer<String, String> kafkaConsumer) {
    try {
        do {
            ConsumerRecords<String, String> records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
            log.info("Kafka poll returned {} records", records.count());
            checkEmptyFetch(records);
            processRecords(records);
        } while (!consumerPaused.get() && !consumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!consumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        resetConsumerStatus();
    }
    return true;
}


}


On Thu, Oct 25, 2018 at 6:11 PM pradeep s <sr...@gmail.com>
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the
> messages and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <kh...@gmail.com>
> wrote:
>
>> One item to be aware with pause and resume - is that it applies to
>> partitions currently assigned to the consumer.
>>
>> But partitions can get revoked or additional partitions can get assigned
>> to
>> consumer.
>>
>> With reassigned , you might be expecting the consumer to be paused but
>> suddenly start getting messages because a new partition got assigned.
>>
>> Use the RebalanceListener to pause or resume any new partitions
>>
>> regards
>>
>> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>> > That is correct: clients are not thread safe.
>> >
>> > You can use an `AtomicBoolean needToResume` that you share over both
>> > threads and that is initially false.
>> >
>> > In your scheduled method, you set the variable to true.
>> >
>> > In your main consumer, each time before you call poll(), you check if
>> > the variable is set to true. If yes, you resume() and reset the variable
>> > to false.
>> >
>> > Hope this helps.
>> >
>> > -Matthias
>> >
>> >
>> > On 10/25/18 2:09 PM, pradeep s wrote:
>> > > Thanks Matthias. I am facing the issue  when i am trying to call the
>> > resume
>> > > from the scheduled method .
>> > > Was getting exception that  Kafka Consumer is not safe for multi
>> threaded
>> > > access . I am trying to see how can call pause and resume on the same
>> > > thread. There will be only one thread running for consumption.
>> > >
>> > >
>> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
>> matthias@confluent.io>
>> > > wrote:
>> > >
>> > >> There is no issue if you call `poll()` is all partitions are paused.
>> If
>> > >> fact, if you want to make sure that the consumer does not fall out of
>> > >> the consumer group, you must call `poll()` in regular interval to not
>> > >> hit `max.poll.interval.ms` timeout.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 10/24/18 10:25 AM, pradeep s wrote:
>> > >>> Pause and resume is required since i am running a pod in kubernetes
>> > and i
>> > >>> am not shutting down the app
>> > >>>
>> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
>> > sreekumar.pradeep@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>> Hi,
>> > >>>> I have a requirement to have kafka streaming start at scheduled
>> time
>> > and
>> > >>>> then pause the stream when the consumer poll returns empty fetches
>> for
>> > >> 3 or
>> > >>>> more polls.
>> > >>>>
>> > >>>> I am starting a consumer poll loop during application startup
>> using a
>> > >>>> singled thread executor and then pausing the consumer when the
>> poll is
>> > >>>> returning empty for 3 polls.
>> > >>>>
>> > >>>> When the schedule kicks in , i am calling *consumer.resume.*
>> > >>>>
>> > >>>> Is this approach correct ?
>> > >>>> Will it cause any issue If the  consumer calls poll on a paused
>> > >> consumer.
>> > >>>>
>> > >>>> Skeleton Code
>> > >>>> ============
>> > >>>>
>> > >>>> public class *OfferItemImageConsumer* implements Runnable {
>> > >>>>
>> > >>>> @Override
>> > >>>> public void run() {
>> > >>>>     try {
>> > >>>>         do {
>> > >>>>             ConsumerRecords<String, String> records =
>> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>> > >>>>             writeAndPauseEmptyFetch(records);
>> > >>>>             processRecords(records);
>> > >>>>         } while (!consumerLoopClosed.get());
>> > >>>>     } catch (RuntimeException ex) {
>> > >>>>         handleConsumerLoopException(ex);
>> > >>>>     } finally {
>> > >>>>         kafkaConsumer.close();
>> > >>>>     }
>> > >>>> }
>> > >>>>
>> > >>>>
>> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String,
>> String>
>> > >> records) {
>> > >>>>     if (records.isEmpty()) {
>> > >>>>         emptyFetchCount++;
>> > >>>>     }
>> > >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD &&
>> !consumerPaused) {
>> > >>>>         writeImageData();
>> > >>>>         emptyFetchCount = 0;
>> > >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
>> > >>>>         consumerPaused = true;
>> > >>>>     }
>> > >>>> }
>> > >>>>
>> > >>>> }
>> > >>>>
>> > >>>> =================================
>> > >>>>
>> > >>>> public class *ItemImageStreamScheduler* {
>> > >>>>     private static final int TERMINATION_TIMEOUT = 10;
>> > >>>>
>> > >>>>
>> > >>>>     private ExecutorService executorService =
>> > >> Executors.newSingleThreadExecutor();
>> > >>>>
>> > >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
>> > >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
>> > >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
>> > >>>>
>> > >>>>     @EventListener(ApplicationReadyEvent.class)
>> > >>>>     void startStreaming() {
>> > >>>>         executorService.submit(offerItemImageConsumer);
>> > >>>>     }
>> > >>>>     @Scheduled
>> > >>>>     void resumeStreaming() {
>> > >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
>> > >>>>     }
>> > >>>>
>> > >>>>
>> > >>>> }
>> > >>>>
>> > >>>> Thanks
>> > >>>>
>> > >>>> Pradeep
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> > >>
>> > >
>> >
>> >
>>
>> --
>> http://khangaonkar.blogspot.com/
>>
>

Re: Consumer Pause & Scheduled Resume

Posted by Manoj Khangaonkar <kh...@gmail.com>.
Hi Pradeep

The poll , pause and resume need to happen in the same thread -- in the
same while loop.

If a scheduler is the trigger for pause or resume, do not call pause
/resume from the scheduler thread. Instead set a
variable in the class that has the poll loop. The poll loop can check the
variable and pause/resume as necessary.

For the rebalance scenario , you should implement the
ConsumerRebalanceListener interface and register it with the consumer.
It will get called when paritions are assigned or revoked. There you can
call pause or resume again

Hope this helps

regards


On Thu, Oct 25, 2018 at 6:11 PM pradeep s <sr...@gmail.com>
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the messages
> and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <kh...@gmail.com>
> wrote:
>
> > One item to be aware with pause and resume - is that it applies to
> > partitions currently assigned to the consumer.
> >
> > But partitions can get revoked or additional partitions can get assigned
> to
> > consumer.
> >
> > With reassigned , you might be expecting the consumer to be paused but
> > suddenly start getting messages because a new partition got assigned.
> >
> > Use the RebalanceListener to pause or resume any new partitions
> >
> > regards
> >
> > On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > That is correct: clients are not thread safe.
> > >
> > > You can use an `AtomicBoolean needToResume` that you share over both
> > > threads and that is initially false.
> > >
> > > In your scheduled method, you set the variable to true.
> > >
> > > In your main consumer, each time before you call poll(), you check if
> > > the variable is set to true. If yes, you resume() and reset the
> variable
> > > to false.
> > >
> > > Hope this helps.
> > >
> > > -Matthias
> > >
> > >
> > > On 10/25/18 2:09 PM, pradeep s wrote:
> > > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > > resume
> > > > from the scheduled method .
> > > > Was getting exception that  Kafka Consumer is not safe for multi
> > threaded
> > > > access . I am trying to see how can call pause and resume on the same
> > > > thread. There will be only one thread running for consumption.
> > > >
> > > >
> > > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > >> There is no issue if you call `poll()` is all partitions are paused.
> > If
> > > >> fact, if you want to make sure that the consumer does not fall out
> of
> > > >> the consumer group, you must call `poll()` in regular interval to
> not
> > > >> hit `max.poll.interval.ms` timeout.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > > >>> Pause and resume is required since i am running a pod in kubernetes
> > > and i
> > > >>> am not shutting down the app
> > > >>>
> > > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > > sreekumar.pradeep@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Hi,
> > > >>>> I have a requirement to have kafka streaming start at scheduled
> time
> > > and
> > > >>>> then pause the stream when the consumer poll returns empty fetches
> > for
> > > >> 3 or
> > > >>>> more polls.
> > > >>>>
> > > >>>> I am starting a consumer poll loop during application startup
> using
> > a
> > > >>>> singled thread executor and then pausing the consumer when the
> poll
> > is
> > > >>>> returning empty for 3 polls.
> > > >>>>
> > > >>>> When the schedule kicks in , i am calling *consumer.resume.*
> > > >>>>
> > > >>>> Is this approach correct ?
> > > >>>> Will it cause any issue If the  consumer calls poll on a paused
> > > >> consumer.
> > > >>>>
> > > >>>> Skeleton Code
> > > >>>> ============
> > > >>>>
> > > >>>> public class *OfferItemImageConsumer* implements Runnable {
> > > >>>>
> > > >>>> @Override
> > > >>>> public void run() {
> > > >>>>     try {
> > > >>>>         do {
> > > >>>>             ConsumerRecords<String, String> records =
> > > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> > > >>>>             writeAndPauseEmptyFetch(records);
> > > >>>>             processRecords(records);
> > > >>>>         } while (!consumerLoopClosed.get());
> > > >>>>     } catch (RuntimeException ex) {
> > > >>>>         handleConsumerLoopException(ex);
> > > >>>>     } finally {
> > > >>>>         kafkaConsumer.close();
> > > >>>>     }
> > > >>>> }
> > > >>>>
> > > >>>>
> > > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String,
> String>
> > > >> records) {
> > > >>>>     if (records.isEmpty()) {
> > > >>>>         emptyFetchCount++;
> > > >>>>     }
> > > >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD &&
> !consumerPaused)
> > {
> > > >>>>         writeImageData();
> > > >>>>         emptyFetchCount = 0;
> > > >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
> > > >>>>         consumerPaused = true;
> > > >>>>     }
> > > >>>> }
> > > >>>>
> > > >>>> }
> > > >>>>
> > > >>>> =================================
> > > >>>>
> > > >>>> public class *ItemImageStreamScheduler* {
> > > >>>>     private static final int TERMINATION_TIMEOUT = 10;
> > > >>>>
> > > >>>>
> > > >>>>     private ExecutorService executorService =
> > > >> Executors.newSingleThreadExecutor();
> > > >>>>
> > > >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
> > > >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
> > > >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
> > > >>>>
> > > >>>>     @EventListener(ApplicationReadyEvent.class)
> > > >>>>     void startStreaming() {
> > > >>>>         executorService.submit(offerItemImageConsumer);
> > > >>>>     }
> > > >>>>     @Scheduled
> > > >>>>     void resumeStreaming() {
> > > >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
> > > >>>>     }
> > > >>>>
> > > >>>>
> > > >>>> }
> > > >>>>
> > > >>>> Thanks
> > > >>>>
> > > >>>> Pradeep
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
> > --
> > http://khangaonkar.blogspot.com/
> >
>


-- 
http://khangaonkar.blogspot.com/

Re: Consumer Pause & Scheduled Resume

Posted by pradeep s <sr...@gmail.com>.
Hi Manoj/Matthias,
My requirement is that to run the consumer daily once , stream the messages
and pause when i am encountering a few empty fetches .
I am planning to  run two consumers and  pausing the consumption based on
the empty fetches for a topic with 4 partitions .
To avoid the consumer multi thread access issue , i am running  consumer,
exit  the poll loop, and calling pause on the same thread. In this case , i
will not continuously polling .
When the next schedule kicks in , i will resume the polling .
Will the consumer resume call cause issues  ,since the schedule loop is
trigger long time after the polling stopped .(Or the old approach of
continuous polling is the correct one)
Also ,Manoj, can you please explain on the rebalance scenario if the
consumer is paused for two partitions and gets the assignment for another
two partitions (because of a pod termination), how can i pause the
consumption if its not the scheduled time to process the records.
Thanks
Pradeep

On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar <kh...@gmail.com>
wrote:

> One item to be aware with pause and resume - is that it applies to
> partitions currently assigned to the consumer.
>
> But partitions can get revoked or additional partitions can get assigned to
> consumer.
>
> With reassigned , you might be expecting the consumer to be paused but
> suddenly start getting messages because a new partition got assigned.
>
> Use the RebalanceListener to pause or resume any new partitions
>
> regards
>
> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > That is correct: clients are not thread safe.
> >
> > You can use an `AtomicBoolean needToResume` that you share over both
> > threads and that is initially false.
> >
> > In your scheduled method, you set the variable to true.
> >
> > In your main consumer, each time before you call poll(), you check if
> > the variable is set to true. If yes, you resume() and reset the variable
> > to false.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >
> > On 10/25/18 2:09 PM, pradeep s wrote:
> > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > resume
> > > from the scheduled method .
> > > Was getting exception that  Kafka Consumer is not safe for multi
> threaded
> > > access . I am trying to see how can call pause and resume on the same
> > > thread. There will be only one thread running for consumption.
> > >
> > >
> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <matthias@confluent.io
> >
> > > wrote:
> > >
> > >> There is no issue if you call `poll()` is all partitions are paused.
> If
> > >> fact, if you want to make sure that the consumer does not fall out of
> > >> the consumer group, you must call `poll()` in regular interval to not
> > >> hit `max.poll.interval.ms` timeout.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > >>> Pause and resume is required since i am running a pod in kubernetes
> > and i
> > >>> am not shutting down the app
> > >>>
> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > sreekumar.pradeep@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Hi,
> > >>>> I have a requirement to have kafka streaming start at scheduled time
> > and
> > >>>> then pause the stream when the consumer poll returns empty fetches
> for
> > >> 3 or
> > >>>> more polls.
> > >>>>
> > >>>> I am starting a consumer poll loop during application startup using
> a
> > >>>> singled thread executor and then pausing the consumer when the poll
> is
> > >>>> returning empty for 3 polls.
> > >>>>
> > >>>> When the schedule kicks in , i am calling *consumer.resume.*
> > >>>>
> > >>>> Is this approach correct ?
> > >>>> Will it cause any issue If the  consumer calls poll on a paused
> > >> consumer.
> > >>>>
> > >>>> Skeleton Code
> > >>>> ============
> > >>>>
> > >>>> public class *OfferItemImageConsumer* implements Runnable {
> > >>>>
> > >>>> @Override
> > >>>> public void run() {
> > >>>>     try {
> > >>>>         do {
> > >>>>             ConsumerRecords<String, String> records =
> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> > >>>>             writeAndPauseEmptyFetch(records);
> > >>>>             processRecords(records);
> > >>>>         } while (!consumerLoopClosed.get());
> > >>>>     } catch (RuntimeException ex) {
> > >>>>         handleConsumerLoopException(ex);
> > >>>>     } finally {
> > >>>>         kafkaConsumer.close();
> > >>>>     }
> > >>>> }
> > >>>>
> > >>>>
> > >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> > >> records) {
> > >>>>     if (records.isEmpty()) {
> > >>>>         emptyFetchCount++;
> > >>>>     }
> > >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused)
> {
> > >>>>         writeImageData();
> > >>>>         emptyFetchCount = 0;
> > >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
> > >>>>         consumerPaused = true;
> > >>>>     }
> > >>>> }
> > >>>>
> > >>>> }
> > >>>>
> > >>>> =================================
> > >>>>
> > >>>> public class *ItemImageStreamScheduler* {
> > >>>>     private static final int TERMINATION_TIMEOUT = 10;
> > >>>>
> > >>>>
> > >>>>     private ExecutorService executorService =
> > >> Executors.newSingleThreadExecutor();
> > >>>>
> > >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
> > >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
> > >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
> > >>>>
> > >>>>     @EventListener(ApplicationReadyEvent.class)
> > >>>>     void startStreaming() {
> > >>>>         executorService.submit(offerItemImageConsumer);
> > >>>>     }
> > >>>>     @Scheduled
> > >>>>     void resumeStreaming() {
> > >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
> > >>>>     }
> > >>>>
> > >>>>
> > >>>> }
> > >>>>
> > >>>> Thanks
> > >>>>
> > >>>> Pradeep
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
> --
> http://khangaonkar.blogspot.com/
>

Re: Consumer Pause & Scheduled Resume

Posted by Manoj Khangaonkar <kh...@gmail.com>.
One item to be aware with pause and resume - is that it applies to
partitions currently assigned to the consumer.

But partitions can get revoked or additional partitions can get assigned to
consumer.

With reassigned , you might be expecting the consumer to be paused but
suddenly start getting messages because a new partition got assigned.

Use the RebalanceListener to pause or resume any new partitions

regards

On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> That is correct: clients are not thread safe.
>
> You can use an `AtomicBoolean needToResume` that you share over both
> threads and that is initially false.
>
> In your scheduled method, you set the variable to true.
>
> In your main consumer, each time before you call poll(), you check if
> the variable is set to true. If yes, you resume() and reset the variable
> to false.
>
> Hope this helps.
>
> -Matthias
>
>
> On 10/25/18 2:09 PM, pradeep s wrote:
> > Thanks Matthias. I am facing the issue  when i am trying to call the
> resume
> > from the scheduled method .
> > Was getting exception that  Kafka Consumer is not safe for multi threaded
> > access . I am trying to see how can call pause and resume on the same
> > thread. There will be only one thread running for consumption.
> >
> >
> > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> There is no issue if you call `poll()` is all partitions are paused. If
> >> fact, if you want to make sure that the consumer does not fall out of
> >> the consumer group, you must call `poll()` in regular interval to not
> >> hit `max.poll.interval.ms` timeout.
> >>
> >>
> >> -Matthias
> >>
> >> On 10/24/18 10:25 AM, pradeep s wrote:
> >>> Pause and resume is required since i am running a pod in kubernetes
> and i
> >>> am not shutting down the app
> >>>
> >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> sreekumar.pradeep@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>> I have a requirement to have kafka streaming start at scheduled time
> and
> >>>> then pause the stream when the consumer poll returns empty fetches for
> >> 3 or
> >>>> more polls.
> >>>>
> >>>> I am starting a consumer poll loop during application startup using a
> >>>> singled thread executor and then pausing the consumer when the poll is
> >>>> returning empty for 3 polls.
> >>>>
> >>>> When the schedule kicks in , i am calling *consumer.resume.*
> >>>>
> >>>> Is this approach correct ?
> >>>> Will it cause any issue If the  consumer calls poll on a paused
> >> consumer.
> >>>>
> >>>> Skeleton Code
> >>>> ============
> >>>>
> >>>> public class *OfferItemImageConsumer* implements Runnable {
> >>>>
> >>>> @Override
> >>>> public void run() {
> >>>>     try {
> >>>>         do {
> >>>>             ConsumerRecords<String, String> records =
> >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >>>>             writeAndPauseEmptyFetch(records);
> >>>>             processRecords(records);
> >>>>         } while (!consumerLoopClosed.get());
> >>>>     } catch (RuntimeException ex) {
> >>>>         handleConsumerLoopException(ex);
> >>>>     } finally {
> >>>>         kafkaConsumer.close();
> >>>>     }
> >>>> }
> >>>>
> >>>>
> >>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> >> records) {
> >>>>     if (records.isEmpty()) {
> >>>>         emptyFetchCount++;
> >>>>     }
> >>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >>>>         writeImageData();
> >>>>         emptyFetchCount = 0;
> >>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
> >>>>         consumerPaused = true;
> >>>>     }
> >>>> }
> >>>>
> >>>> }
> >>>>
> >>>> =================================
> >>>>
> >>>> public class *ItemImageStreamScheduler* {
> >>>>     private static final int TERMINATION_TIMEOUT = 10;
> >>>>
> >>>>
> >>>>     private ExecutorService executorService =
> >> Executors.newSingleThreadExecutor();
> >>>>
> >>>>     private final OfferItemImageConsumer offerItemImageConsumer;
> >>>>     private final ItemImageStreamConfig itemImageStreamConfig;
> >>>>     private final KafkaConsumer<String, String> kafkaConsumer;
> >>>>
> >>>>     @EventListener(ApplicationReadyEvent.class)
> >>>>     void startStreaming() {
> >>>>         executorService.submit(offerItemImageConsumer);
> >>>>     }
> >>>>     @Scheduled
> >>>>     void resumeStreaming() {
> >>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
> >>>>     }
> >>>>
> >>>>
> >>>> }
> >>>>
> >>>> Thanks
> >>>>
> >>>> Pradeep
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
http://khangaonkar.blogspot.com/

Re: Consumer Pause & Scheduled Resume

Posted by "Matthias J. Sax" <ma...@confluent.io>.
That is correct: clients are not thread safe.

You can use an `AtomicBoolean needToResume` that you share over both
threads and that is initially false.

In your scheduled method, you set the variable to true.

In your main consumer, each time before you call poll(), you check if
the variable is set to true. If yes, you resume() and reset the variable
to false.

Hope this helps.

-Matthias


On 10/25/18 2:09 PM, pradeep s wrote:
> Thanks Matthias. I am facing the issue  when i am trying to call the resume
> from the scheduled method .
> Was getting exception that  Kafka Consumer is not safe for multi threaded
> access . I am trying to see how can call pause and resume on the same
> thread. There will be only one thread running for consumption.
> 
> 
> On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> There is no issue if you call `poll()` is all partitions are paused. If
>> fact, if you want to make sure that the consumer does not fall out of
>> the consumer group, you must call `poll()` in regular interval to not
>> hit `max.poll.interval.ms` timeout.
>>
>>
>> -Matthias
>>
>> On 10/24/18 10:25 AM, pradeep s wrote:
>>> Pause and resume is required since i am running a pod in kubernetes and i
>>> am not shutting down the app
>>>
>>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sr...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I have a requirement to have kafka streaming start at scheduled time and
>>>> then pause the stream when the consumer poll returns empty fetches for
>> 3 or
>>>> more polls.
>>>>
>>>> I am starting a consumer poll loop during application startup using a
>>>> singled thread executor and then pausing the consumer when the poll is
>>>> returning empty for 3 polls.
>>>>
>>>> When the schedule kicks in , i am calling *consumer.resume.*
>>>>
>>>> Is this approach correct ?
>>>> Will it cause any issue If the  consumer calls poll on a paused
>> consumer.
>>>>
>>>> Skeleton Code
>>>> ============
>>>>
>>>> public class *OfferItemImageConsumer* implements Runnable {
>>>>
>>>> @Override
>>>> public void run() {
>>>>     try {
>>>>         do {
>>>>             ConsumerRecords<String, String> records =
>> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>>>>             writeAndPauseEmptyFetch(records);
>>>>             processRecords(records);
>>>>         } while (!consumerLoopClosed.get());
>>>>     } catch (RuntimeException ex) {
>>>>         handleConsumerLoopException(ex);
>>>>     } finally {
>>>>         kafkaConsumer.close();
>>>>     }
>>>> }
>>>>
>>>>
>>>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
>> records) {
>>>>     if (records.isEmpty()) {
>>>>         emptyFetchCount++;
>>>>     }
>>>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>>>>         writeImageData();
>>>>         emptyFetchCount = 0;
>>>>         kafkaConsumer.pause(kafkaConsumer.assignment());
>>>>         consumerPaused = true;
>>>>     }
>>>> }
>>>>
>>>> }
>>>>
>>>> =================================
>>>>
>>>> public class *ItemImageStreamScheduler* {
>>>>     private static final int TERMINATION_TIMEOUT = 10;
>>>>
>>>>
>>>>     private ExecutorService executorService =
>> Executors.newSingleThreadExecutor();
>>>>
>>>>     private final OfferItemImageConsumer offerItemImageConsumer;
>>>>     private final ItemImageStreamConfig itemImageStreamConfig;
>>>>     private final KafkaConsumer<String, String> kafkaConsumer;
>>>>
>>>>     @EventListener(ApplicationReadyEvent.class)
>>>>     void startStreaming() {
>>>>         executorService.submit(offerItemImageConsumer);
>>>>     }
>>>>     @Scheduled
>>>>     void resumeStreaming() {
>>>>         kafkaConsumer.resume(kafkaConsumer.assignment());
>>>>     }
>>>>
>>>>
>>>> }
>>>>
>>>> Thanks
>>>>
>>>> Pradeep
>>>>
>>>>
>>>
>>
>>
> 


Re: Consumer Pause & Scheduled Resume

Posted by pradeep s <sr...@gmail.com>.
Thanks Matthias. I am facing the issue  when i am trying to call the resume
from the scheduled method .
Was getting exception that  Kafka Consumer is not safe for multi threaded
access . I am trying to see how can call pause and resume on the same
thread. There will be only one thread running for consumption.


On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> There is no issue if you call `poll()` is all partitions are paused. If
> fact, if you want to make sure that the consumer does not fall out of
> the consumer group, you must call `poll()` in regular interval to not
> hit `max.poll.interval.ms` timeout.
>
>
> -Matthias
>
> On 10/24/18 10:25 AM, pradeep s wrote:
> > Pause and resume is required since i am running a pod in kubernetes and i
> > am not shutting down the app
> >
> > On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sr...@gmail.com>
> > wrote:
> >
> >> Hi,
> >> I have a requirement to have kafka streaming start at scheduled time and
> >> then pause the stream when the consumer poll returns empty fetches for
> 3 or
> >> more polls.
> >>
> >> I am starting a consumer poll loop during application startup using a
> >> singled thread executor and then pausing the consumer when the poll is
> >> returning empty for 3 polls.
> >>
> >> When the schedule kicks in , i am calling *consumer.resume.*
> >>
> >> Is this approach correct ?
> >> Will it cause any issue If the  consumer calls poll on a paused
> consumer.
> >>
> >> Skeleton Code
> >> ============
> >>
> >> public class *OfferItemImageConsumer* implements Runnable {
> >>
> >> @Override
> >> public void run() {
> >>     try {
> >>         do {
> >>             ConsumerRecords<String, String> records =
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >>             writeAndPauseEmptyFetch(records);
> >>             processRecords(records);
> >>         } while (!consumerLoopClosed.get());
> >>     } catch (RuntimeException ex) {
> >>         handleConsumerLoopException(ex);
> >>     } finally {
> >>         kafkaConsumer.close();
> >>     }
> >> }
> >>
> >>
> >> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String>
> records) {
> >>     if (records.isEmpty()) {
> >>         emptyFetchCount++;
> >>     }
> >>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >>         writeImageData();
> >>         emptyFetchCount = 0;
> >>         kafkaConsumer.pause(kafkaConsumer.assignment());
> >>         consumerPaused = true;
> >>     }
> >> }
> >>
> >> }
> >>
> >> =================================
> >>
> >> public class *ItemImageStreamScheduler* {
> >>     private static final int TERMINATION_TIMEOUT = 10;
> >>
> >>
> >>     private ExecutorService executorService =
> Executors.newSingleThreadExecutor();
> >>
> >>     private final OfferItemImageConsumer offerItemImageConsumer;
> >>     private final ItemImageStreamConfig itemImageStreamConfig;
> >>     private final KafkaConsumer<String, String> kafkaConsumer;
> >>
> >>     @EventListener(ApplicationReadyEvent.class)
> >>     void startStreaming() {
> >>         executorService.submit(offerItemImageConsumer);
> >>     }
> >>     @Scheduled
> >>     void resumeStreaming() {
> >>         kafkaConsumer.resume(kafkaConsumer.assignment());
> >>     }
> >>
> >>
> >> }
> >>
> >> Thanks
> >>
> >> Pradeep
> >>
> >>
> >
>
>

Re: Consumer Pause & Scheduled Resume

Posted by "Matthias J. Sax" <ma...@confluent.io>.
There is no issue if you call `poll()` is all partitions are paused. If
fact, if you want to make sure that the consumer does not fall out of
the consumer group, you must call `poll()` in regular interval to not
hit `max.poll.interval.ms` timeout.


-Matthias

On 10/24/18 10:25 AM, pradeep s wrote:
> Pause and resume is required since i am running a pod in kubernetes and i
> am not shutting down the app
> 
> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sr...@gmail.com>
> wrote:
> 
>> Hi,
>> I have a requirement to have kafka streaming start at scheduled time and
>> then pause the stream when the consumer poll returns empty fetches for 3 or
>> more polls.
>>
>> I am starting a consumer poll loop during application startup using a
>> singled thread executor and then pausing the consumer when the poll is
>> returning empty for 3 polls.
>>
>> When the schedule kicks in , i am calling *consumer.resume.*
>>
>> Is this approach correct ?
>> Will it cause any issue If the  consumer calls poll on a paused consumer.
>>
>> Skeleton Code
>> ============
>>
>> public class *OfferItemImageConsumer* implements Runnable {
>>
>> @Override
>> public void run() {
>>     try {
>>         do {
>>             ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>>             writeAndPauseEmptyFetch(records);
>>             processRecords(records);
>>         } while (!consumerLoopClosed.get());
>>     } catch (RuntimeException ex) {
>>         handleConsumerLoopException(ex);
>>     } finally {
>>         kafkaConsumer.close();
>>     }
>> }
>>
>>
>> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) {
>>     if (records.isEmpty()) {
>>         emptyFetchCount++;
>>     }
>>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>>         writeImageData();
>>         emptyFetchCount = 0;
>>         kafkaConsumer.pause(kafkaConsumer.assignment());
>>         consumerPaused = true;
>>     }
>> }
>>
>> }
>>
>> =================================
>>
>> public class *ItemImageStreamScheduler* {
>>     private static final int TERMINATION_TIMEOUT = 10;
>>
>>
>>     private ExecutorService executorService = Executors.newSingleThreadExecutor();
>>
>>     private final OfferItemImageConsumer offerItemImageConsumer;
>>     private final ItemImageStreamConfig itemImageStreamConfig;
>>     private final KafkaConsumer<String, String> kafkaConsumer;
>>
>>     @EventListener(ApplicationReadyEvent.class)
>>     void startStreaming() {
>>         executorService.submit(offerItemImageConsumer);
>>     }
>>     @Scheduled
>>     void resumeStreaming() {
>>         kafkaConsumer.resume(kafkaConsumer.assignment());
>>     }
>>
>>
>> }
>>
>> Thanks
>>
>> Pradeep
>>
>>
> 


Re: Consumer Pause & Scheduled Resume

Posted by pradeep s <sr...@gmail.com>.
Pause and resume is required since i am running a pod in kubernetes and i
am not shutting down the app

On Tue, Oct 23, 2018 at 10:33 PM pradeep s <sr...@gmail.com>
wrote:

> Hi,
> I have a requirement to have kafka streaming start at scheduled time and
> then pause the stream when the consumer poll returns empty fetches for 3 or
> more polls.
>
> I am starting a consumer poll loop during application startup using a
> singled thread executor and then pausing the consumer when the poll is
> returning empty for 3 polls.
>
> When the schedule kicks in , i am calling *consumer.resume.*
>
> Is this approach correct ?
> Will it cause any issue If the  consumer calls poll on a paused consumer.
>
> Skeleton Code
> ============
>
> public class *OfferItemImageConsumer* implements Runnable {
>
> @Override
> public void run() {
>     try {
>         do {
>             ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>             writeAndPauseEmptyFetch(records);
>             processRecords(records);
>         } while (!consumerLoopClosed.get());
>     } catch (RuntimeException ex) {
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
> }
>
>
> private void writeAndPauseEmptyFetch(ConsumerRecords<String, String> records) {
>     if (records.isEmpty()) {
>         emptyFetchCount++;
>     }
>     if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>         writeImageData();
>         emptyFetchCount = 0;
>         kafkaConsumer.pause(kafkaConsumer.assignment());
>         consumerPaused = true;
>     }
> }
>
> }
>
> =================================
>
> public class *ItemImageStreamScheduler* {
>     private static final int TERMINATION_TIMEOUT = 10;
>
>
>     private ExecutorService executorService = Executors.newSingleThreadExecutor();
>
>     private final OfferItemImageConsumer offerItemImageConsumer;
>     private final ItemImageStreamConfig itemImageStreamConfig;
>     private final KafkaConsumer<String, String> kafkaConsumer;
>
>     @EventListener(ApplicationReadyEvent.class)
>     void startStreaming() {
>         executorService.submit(offerItemImageConsumer);
>     }
>     @Scheduled
>     void resumeStreaming() {
>         kafkaConsumer.resume(kafkaConsumer.assignment());
>     }
>
>
> }
>
> Thanks
>
> Pradeep
>
>