You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Phil Luckhurst <ph...@encycle.com> on 2016/04/21 11:18:56 UTC

Detecting rebalance while processing ConsumerRecords (0.9.0.1)

This is an example of  the scenario I'm trying avoid where 2 consumers end up processing the same records from a partition at the same time.


1.       I have a topic with 2 partitions and two consumers A and B which have each been assigned a partition from the topic.

2.       Consumer A is processing a batch of 100 records from partition 1 and it takes much longer than expected so after 20 records the consumer appears dead so a rebalance occurs.

3.       Consumer B now gets assigned both partitions and starts processing partition 1 from the last committed offset by consumer A.

4.       Consumer A which, unknown to it, was the cause of the rebalance is still processing the remaining records in its batch of 100 which are the same records that Consumer B is also now processing from the same partition.

Is there a way that I can detect in the ConsumerRecords loop of Consumer A that it has been marked dead and should skip the remaining records in the batch and do the next poll() which will cause it to trigger another rebalance and rejoin the group? I have added a ConsumerRebalanceListener but as onPartitionsRevoked() only gets called when poll() is called I don't get notified until all the records from the current poll batch have been processed.

This shows where I think I need the check.

while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                records.forEach((ConsumerRecord<String, String> crs) -> {
                                //  I think I need a check here to jump out the loop and call poll() again?
                                If (consumer.isDead())
                                                continue;

                }
}

I've looked at the suggestions for using pause and resume so that poll() can be called during the record processing loop which might allow me to do it like this?


1.       Call pause at the start of the loop.

2.       Call poll(0) in the loop which will trigger the call to ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance.

3.       If ConsumerRebalanceListener onPartitionsRevoked() was called then I would call resume and break out of the record processing loop so the main poll() request is called again.

4.       Call resume at the end of the record processing loop.

Is that a viable solution to the problem or is there a better way to do this?

Thanks
Phil Luckhurst

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
I saw this in code of 0.9.0.1. I am sure about this because i am caching
this exception and executing a logic where i stop further record processing
if autocommit is off else i keep processing records got in current poll
even if commit fails. This is because kafka marks all records sent to user
as committed just before a rebalance only when autocommit in on otherwise
with autocommit off, in next poll kafka sends records from last commited
offset by consumer. I have seen some cases where autocommit also fails but
that happened rarely and i did not got a chance to investigate that.

Regards,
Vinay
On Apr 29, 2016 6:12 AM, "Phil Luckhurst" <ph...@encycle.com>
wrote:

> Hi Vinay,
>
> This statement is very interesting.
>
> "I noticed that in case where a consumer is marked dead or a rebalance is
> in progress, kafka throws CommitFailedException. A KafkaException is thrown
> only when something unknown has happened which is not yet categorized."
>
> I will test this out but was this with 0.9.01 or 0.10.1.0? If commitSync
> fails with CommitFailedException when the consumer is marked dead then that
> is the final piece to my jigsaw. If I get that exception I know I can
> safely abandon the current batch of records I am processing and return to
> my poll command knowing that the ones I haven’t committed will be picked up
> after the rebalance completes.
>
> Many thanks,
> Phil
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 28 April 2016 21:34
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> I tested my code and it is correct. I do see heartbeats getting missed
> sometimes and causing session time out for consumer where generation is
> marked dead. I see that there are long time windows where there is no
> heartbeat whereas i do commit in between these time windows and there is no
> reason to skip a heartbeat as the last hearbeat was way before configured
> heartbeat interval. I ran tests with kafka client jar build from trunk
> (0.10.1.0) and it works fine due to latest changes where each commitSync
> acts as a heartbeat.
>
> Calling commitSync after each record processing works fine because it
> triggers more number of heartbeat requests in a timeinterval making chances
> extremely small where heartbeats can be missed for more than session time
> out.
>
> You wrote in a previous mail that
>
> "I thought having commitSync throw an explicit 'rebalance in progress' type
> > exception rather than just a KafkaException would allow this to be
> > easily identified and handled."
>
>
> I noticed that in case where a consumer is marked dead or a rebalance is
> in progress, kafka throws CommitFailedException. A KafkaException is thrown
> only when something unknown has happened which is not yet categorized.
>
>
> Regards,
> Vinay
>
> On Wed, Apr 27, 2016 at 7:52 AM, vinay sharma <vi...@gmail.com>
> wrote:
>
> > Hi Phil,
> >
> > This sounds great. Thanks for trying these serrings. This means
> > probably something wrong in my code or setup. I will check what is
> > causing this issue in my case.
> >
> > I have a 3 broker 1 zk cluster and my topic has 3 partitions with
> > replication factor 3.
> >
> > Regards,
> > Vinay Sharma
> >
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Hi Vinay,

This statement is very interesting.

"I noticed that in case where a consumer is marked dead or a rebalance is in progress, kafka throws CommitFailedException. A KafkaException is thrown only when something unknown has happened which is not yet categorized."

I will test this out but was this with 0.9.01 or 0.10.1.0? If commitSync fails with CommitFailedException when the consumer is marked dead then that is the final piece to my jigsaw. If I get that exception I know I can safely abandon the current batch of records I am processing and return to my poll command knowing that the ones I haven’t committed will be picked up after the rebalance completes.

Many thanks,
Phil 

-----Original Message-----
From: vinay sharma [mailto:vinsharma.tech@gmail.com] 
Sent: 28 April 2016 21:34
To: users@kafka.apache.org
Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

I tested my code and it is correct. I do see heartbeats getting missed sometimes and causing session time out for consumer where generation is marked dead. I see that there are long time windows where there is no heartbeat whereas i do commit in between these time windows and there is no reason to skip a heartbeat as the last hearbeat was way before configured heartbeat interval. I ran tests with kafka client jar build from trunk
(0.10.1.0) and it works fine due to latest changes where each commitSync acts as a heartbeat.

Calling commitSync after each record processing works fine because it triggers more number of heartbeat requests in a timeinterval making chances extremely small where heartbeats can be missed for more than session time out.

You wrote in a previous mail that

"I thought having commitSync throw an explicit 'rebalance in progress' type
> exception rather than just a KafkaException would allow this to be 
> easily identified and handled."


I noticed that in case where a consumer is marked dead or a rebalance is in progress, kafka throws CommitFailedException. A KafkaException is thrown only when something unknown has happened which is not yet categorized.


Regards,
Vinay

On Wed, Apr 27, 2016 at 7:52 AM, vinay sharma <vi...@gmail.com>
wrote:

> Hi Phil,
>
> This sounds great. Thanks for trying these serrings. This means 
> probably something wrong in my code or setup. I will check what is 
> causing this issue in my case.
>
> I have a 3 broker 1 zk cluster and my topic has 3 partitions with 
> replication factor 3.
>
> Regards,
> Vinay Sharma
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

I tested my code and it is correct. I do see heartbeats getting missed
sometimes and causing session time out for consumer where generation is
marked dead. I see that there are long time windows where there is no
heartbeat whereas i do commit in between these time windows and there is no
reason to skip a heartbeat as the last hearbeat was way before configured
heartbeat interval. I ran tests with kafka client jar build from trunk
(0.10.1.0) and it works fine due to latest changes where each commitSync
acts as a heartbeat.

Calling commitSync after each record processing works fine because it
triggers more number of heartbeat requests in a timeinterval making chances
extremely small where heartbeats can be missed for more than session time
out.

You wrote in a previous mail that

"I thought having commitSync throw an explicit 'rebalance in progress' type
> exception rather than just a KafkaException would allow this to be easily
> identified and handled."


I noticed that in case where a consumer is marked dead or a rebalance is in
progress, kafka throws CommitFailedException. A KafkaException is thrown
only when something unknown has happened which is not yet categorized.


Regards,
Vinay

On Wed, Apr 27, 2016 at 7:52 AM, vinay sharma <vi...@gmail.com>
wrote:

> Hi Phil,
>
> This sounds great. Thanks for trying these serrings. This means probably
> something wrong in my code or setup. I will check what is causing this
> issue in my case.
>
> I have a 3 broker 1 zk cluster and my topic has 3 partitions with
> replication factor 3.
>
> Regards,
> Vinay Sharma
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

This sounds great. Thanks for trying these serrings. This means probably
something wrong in my code or setup. I will check what is causing this
issue in my case.

I have a 3 broker 1 zk cluster and my topic has 3 partitions with
replication factor 3.

Regards,
Vinay Sharma

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Hi Vinay,

I tried this out as you suggested by setting metadata.max.age.ms = 40000 (session.timeout.ms=30000)
I then ran my consumer with a batch of 25 messages where each message takes 4 seconds to process and I call commitSync(offsets) after each message to ensure the heartbeat keeps the consumer alive.

For the messages before metadata.max.age.ms expired I see as expected that the commitSync also logs the heartbeat.

2016-04-27 09:45:11,267 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received successful heartbeat response.
2016-04-27 09:45:11,270 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed offset 184 for partition phil-pa-1-device-update-1

Then the automatic metadata refresh is logged.

2016-04-27 09:45:15,276 DEBUG [pool-3-thread-2] org.apache.kafka.clients.NetworkClient: Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_                    version=0,correlation_id=29,client_id=consumer-3}, body={topics=[phil-pa-1-device-update]}), isInitiatedByNetworkClient, createdTimeMs=1461746715276, sendTimeMs=0) to node 0
2016-04-27 09:45:15,277 DEBUG [pool-3-thread-2] org.apache.kafka.clients.Metadata: Updated cluster metadata version 3 to Cluster(nodes = [Node(0, ta-eng-kafka2, 9092)], partitions = [Partition(topic = phil                    -pa-1-device-update, partition = 0, leader = 0, replicas = [0,], isr = [0,], Partition(topic = phil-pa-1-device-update, partition = 1, leader = 0, replicas = [0,], isr = [0,]])

The very next commitSync call succeeds but as you said does not peform the heartbeat - we don't get the log message"Received successful heartbeat response". But we only miss the heartbeat on that one message, on the commitSync calls that follow that we get the heartbeat back again.

2016-04-27 09:45:23,301 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received successful heartbeat response.
2016-04-27 09:45:23,309 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed offset 187 for partition phil-pa-1-device-update-1

After another 40 seconds we see the next metadata refresh request and the same thing happens again. The first commmitSync after the metadata request does not perform the heartbeat but the ones that follow that do. This means in our case we call commitSync often enough that the metadata request does not cause us an issue.

Thanks,
Phil Luckhurst


-----Original Message-----
From: vinay sharma [mailto:vinsharma.tech@gmail.com] 
Sent: 26 April 2016 17:29
To: users@kafka.apache.org
Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

Config ConsumerConfig.METADATA_MAX_AGE_CONFIG has default 300000 ms. This config drives a mechanism where a proactive meta data refresh request is issued by consumer periodically. i have seen that i get log about successful heartbeat along with commit only before this request. once this request is issued then next successful heartbeat is only after next poll.
This will cause a rebalance and mark current consumer dead if there is no poll in next 30 seconds after meta refresh (where session time is 30
seconds.)

Regards,
Vinay

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

Config ConsumerConfig.METADATA_MAX_AGE_CONFIG has default 300000 ms. This
config drives a mechanism where a proactive meta data refresh request is
issued by consumer periodically. i have seen that i get log about
successful heartbeat along with commit only before this request. once this
request is issued then next successful heartbeat is only after next poll.
This will cause a rebalance and mark current consumer dead if there is no
poll in next 30 seconds after meta refresh (where session time is 30
seconds.)

Regards,
Vinay

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
try setting

props.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 40000);

default is 300000 ms

On Tue, Apr 26, 2016 at 11:34 AM, Phil Luckhurst <phil.luckhurst@encycle.com
> wrote:

> Hi Vinay,
>
> Thanks for that information, it's good to know that will be fixed but I'm
> not sure what would trigger this to happen in the 0.9.0.1 release? What
> would cause the metadata refresh to be called while the consumer is
> processing a batch of messages where I'm committing after each message and
> each message is processed within session.timeout.ms=30000 ?
>
> I added a sleep call to my consumer so that it took 4 seconds to process
> each message in a batch where my session.timeout.ms=30000.   Each
> commitSync(offsets) call results in the following log messages.
>
> 2016-04-26 16:07:08,877 DEBUG [pool-3-thread-2]
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received
> successful heartbeat response.
> 2016-04-26 16:07:08,878 DEBUG [pool-3-thread-2]
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed
> offset 132 for partition phil-pa-1-device-update-1
>
> I ran this with a batch of messages that took longer than
> session.timeout.ms=30000 to be processed but I never saw a metadata
> refresh. I searched through my older logs and I only seem to see the
> metadata requests when the consumers first start running and from a
> producer that it is in the same process.
>
> I guess a consumer rebalance will also trigger a metadata refresh but what
> else might?
>
> Thanks
> Phil Luckhurst
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 26 April 2016 13:24
> To: users@kafka.apache.org
> Subject: RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> CommitSync sends a heartbeat request on each call but it seems that
> somehow it stops sending a heartbeat request after a meta refresh till next
> poll. I asked about this on dev list and came to know that this is fixed in
> 0.10.0.0 which is next version. I heve not gone in to detail of defect but
> it seems something is fixed related to time reset of hearbeat task so that
> next heatbeat request time is calculated correctly. From next version
> commitSync will act as heartbeat as per the defect.
>
> Regards,
> Vinay Sharma
> On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <ph...@encycle.com>
> wrote:
>
> > Hi Vinay,
> >
> > "Regarding identifying a rebalance, how about comparing array used for
> > consumer pause with current assignments of consumer?"
> >
> > I did consider checking that in the commitSync exception handler but
> > didn't try it because if this is in the consumer that has caused the
> > rebalance (i.e. the one that appears to be dead) I didn't think its
> > partition assignments would have been updated when handling the
> > exception, the ConsumerRebalanceListener callbacks have not yet been
> > called - I can give it a try though. That's why I thought having
> > commitSync throw an explicit 'rebalance in progress' type exception
> > rather than just a KafkaException would allow this to be easily
> identified and handled.
> >
> > The information about the metadata request is useful, I'll watch out
> > for that if we change our commit logic.
> >
> > Thanks
> > Phil Luckhurst
> >
> >
> > -----Original Message-----
> > From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> > Sent: 25 April 2016 20:30
> > To: users@kafka.apache.org
> > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > (0.9.0.1)
> >
> > Hi Phil,
> >
> > Regarding identifying a rebalance, how about comparing array used for
> > consumer pause with current assignments of consumer?
> >
> > Regarding refresh after meta data refresh request, that will not
> > happen if you are committing after each record. I have Session time of
> > 30000 ms and if i commit last processed records before session time
> > out then everything is fine except after a meta data refresh request i
> > see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while
> > committing offsets"  on further commits from consumer till next poll.
> > This error means that even on committing on regular intervals (which
> > sends heartbeat) this somehow does not saves consumer from getting
> > timeout during a meta refresh. This issue does not happen if i am
> > committing after each record that is between 2-4 seconds or if a commit
> happens tight after meta refresh response.
> >
> > Regards,
> > Vinay Sharma
> >
> >
> > On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <
> > phil.luckhurst@encycle.com
> > > wrote:
> >
> > > Hi Vinay,
> > >
> > > I'm currently calling commitSync(Map<TopicPartition,
> > > OffsetAndMetadata>
> > > offsets) after each message to just write the partition offset for
> > > that specific message.  Our messages can take several seconds to
> > > process and this only seems to be adding 1 or 2 milliseconds to the
> > > time so is not looking like a significant overhead and is acting as
> > > our
> > heartbeat.
> > >
> > > WRT " I see that kafka sends a metadata refresh request after every
> > > 300000 ms (default) and even though nothing changed (no new
> > > consumer, broker or
> > > partition) this refresh generally triggers a rebalance  (at least in
> > > my
> > > tests) ."
> > >
> > > I'm not seeing this. We've got a ConsumerRebalanceListener
> > > implemented on our consumers and I don't see this this get called
> > > even though I see lots of metadata request being sent. We can also
> > > have quiet periods where we often exceed the 300000 ms refresh
> > > default and those metadata requests don't trigger a rebalance either.
> > >
> > > I'm calling consumer.pause(consumer.assignment().toArray(new
> > > TopicPartition[0])) at the start of each batch and
> > > consumer.resume(consumer.assignment().toArray(new
> > > TopicPartition[0])) the end. This allows us to call poll(0) in the
> > > message loop if we need to block on a message for more than
> > > session.timeout.ms ( this can happen if an external system is
> > > temporarily unavailable). Again this seems to work ok and does not
> trigger a rebalance.
> > >
> > > The only issue we've found is as mentioned before where a rebalance
> > > occurs while we are processing a batch of messages. When that
> > > happens the commitSync fails with a KafkaException and the message
> > > states this is due to a rebalance. We'd like to skip the rest of the
> > > batch when this happens but to do that we'd need to know for sure
> > > that it was because of a rebalance but KafkaException could be
> > > called for other reasons. A KafkaRebalanceException or even a method
> > > we could call on the consumer would allow us to safely abort the
> > > current processing loop knowing that the remaining messages would be
> > > picked up by another consumer after the rebalance - that would stop us
> processing duplicates.
> > >
> > > Thanks
> > > Phil Luckhurst
> > >
> > >
> > > -----Original Message-----
> > > From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> > > Sent: 22 April 2016 14:24
> > > To: users@kafka.apache.org
> > > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > > (0.9.0.1)
> > >
> > > Hi Phil,
> > >
> > > Regarding pause and resume,I have not tried this approach but i
> > > think this approach may not be feasible. If your consumer no longer
> > > has that partition assigned from which record being processed was
> > > fetched or even if partition is assigned again to consumer somehow
> > > you may still not be able to do this and see UNKNOWN_MEMBER_ID or
> > > error ILLEGAL_GENERATION_ID in logs.Let me know if this approach
> > > works for
> > you. i will also try this then.
> > >
> > > Kafka never sends same record to 2 consumers in same consumer group.
> > > If another consumer got the same record then it means that first
> > > consumer doesn't has that particular partition assigned to it anymore.
> > > Even if you want to commit from first consumer you will not be able
> > > to and commit will throw exception. Even after increasing
> > > sessionTimeOut a rebalance can still occur. I see that kafka sends a
> > > metadata refresh request after every 300000 ms (default) and even
> > > though nothing changed (no new consumer, broker or
> > > partition) this refresh generally triggers a rebalance  (atleast in
> > > my
> > > tests) .
> > >
> > > Calling commitSync renews session and keeps consumer alive.
> > > commitSync is a blocking operation so you may not want to call it on
> > > each record processing. You can try calling commitSync(Offset) just
> > > before starting to process a record only if lets say 75% of
> > > configured session time is elapsed. This will keep your consumer
> > > alive during rare longer processing time and will also not commit
> > > each record. But as i said earlier this will not guarantee that
> > > rebalance will not happen. A metadata refresh or other change may
> > > still trigger rebalance but if you take above approach then atleast
> > > a rebalance will not occur because of session time out during a longer
> processing time.
> > >
> > > if you maintain offsets outside of kafka and configure consumers to
> > > coordinate with each other through these external offsets then you
> > > can skip processing duplicate records even if kafka sends same records
> twice.
> > > Through these external offsets you will have to device a way to skip
> > > a record if already processed by another consumer or wait if same
> > > record is in process by another consumer.
> > >
> > >
> > > Regards,
> > > Vinay Sharma
> > >
> > >
> > >
> > > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <
> > > phil.luckhurst@encycle.com>
> > > wrote:
> > >
> > > > Thanks for all the responses. Unfortunately it seems that
> > > > currently there is no fool proof solution to this. It's not a
> > > > problem with the stored offsets as it will happen even if I do a
> > > > commitSync after each record is processed. It's the unprocessed
> > > > records in the batch that get processed twice.
> > > >
> > > > I'm now taking the approach of trying to limit the possibility of
> > > > a rebalance as much as possible by reducing the data returned by
> poll.
> > > > I'm also using the pause, poll, resume pattern to ensure the
> > > > consumer doesn't cause a rebalance if the processing loop takes
> > > > longer than session.timeout.ms.
> > > >
> > > > Cheers,
> > > > Phil
> > > >
> > > >
> > > >
> > > > On 21 Apr 2016 16:24, at 16:24, vinay sharma
> > > > <vi...@gmail.com>
> > > > wrote:
> > > > >Hi,
> > > > >
> > > > >By design Kafka does ensure not to send same record to multiple
> > > > >consumers in same consumer group. Issue is because of rebalance
> > > > >while a processing is going on and records are not yet commited.
> > > > >In my view there are only 2 possible solutions to it
> > > > >1) As mentioned in documentation, store offsets outside of kafka
> > > > >(
> > > > >
> > > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/c
> > > > li
> > > > en
> > > > ts/consumer/KafkaConsumer.html
> > > > ).
> > > > >This is a complete solution but will definitely add extra
> > > > >developement and also extra processing to each message. Problem
> > > > >may still exist if at the time of a crash consumer was out of
> > > > >sync from external custom offset storage and offsets stored in
> kafka both.
> > > > >2)  As mentioned in fix for defect 919 (
> > > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit
> > > > >to true.
> > > > >This will make kafka commit fetched records before rebalancing.
> > > > >Only drawback is that some records may never be processed if
> > > > >consumer crashes while processing records which are already
> > > > >marked committed due to rebalance.
> > > > >
> > > > >Regards,
> > > > >Vinay Sharma
> > > >
> > >
> >
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Hi Vinay,

Thanks for that information, it's good to know that will be fixed but I'm not sure what would trigger this to happen in the 0.9.0.1 release? What would cause the metadata refresh to be called while the consumer is processing a batch of messages where I'm committing after each message and each message is processed within session.timeout.ms=30000 ?

I added a sleep call to my consumer so that it took 4 seconds to process each message in a batch where my session.timeout.ms=30000.   Each commitSync(offsets) call results in the following log messages.

2016-04-26 16:07:08,877 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Received successful heartbeat response.
2016-04-26 16:07:08,878 DEBUG [pool-3-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Committed offset 132 for partition phil-pa-1-device-update-1

I ran this with a batch of messages that took longer than session.timeout.ms=30000 to be processed but I never saw a metadata refresh. I searched through my older logs and I only seem to see the metadata requests when the consumers first start running and from a producer that it is in the same process.

I guess a consumer rebalance will also trigger a metadata refresh but what else might?

Thanks
Phil Luckhurst

-----Original Message-----
From: vinay sharma [mailto:vinsharma.tech@gmail.com] 
Sent: 26 April 2016 13:24
To: users@kafka.apache.org
Subject: RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

CommitSync sends a heartbeat request on each call but it seems that somehow it stops sending a heartbeat request after a meta refresh till next poll. I asked about this on dev list and came to know that this is fixed in
0.10.0.0 which is next version. I heve not gone in to detail of defect but it seems something is fixed related to time reset of hearbeat task so that next heatbeat request time is calculated correctly. From next version commitSync will act as heartbeat as per the defect.

Regards,
Vinay Sharma
On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <ph...@encycle.com>
wrote:

> Hi Vinay,
>
> "Regarding identifying a rebalance, how about comparing array used for 
> consumer pause with current assignments of consumer?"
>
> I did consider checking that in the commitSync exception handler but 
> didn't try it because if this is in the consumer that has caused the 
> rebalance (i.e. the one that appears to be dead) I didn't think its 
> partition assignments would have been updated when handling the 
> exception, the ConsumerRebalanceListener callbacks have not yet been 
> called - I can give it a try though. That's why I thought having 
> commitSync throw an explicit 'rebalance in progress' type exception 
> rather than just a KafkaException would allow this to be easily identified and handled.
>
> The information about the metadata request is useful, I'll watch out 
> for that if we change our commit logic.
>
> Thanks
> Phil Luckhurst
>
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 25 April 2016 20:30
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords 
> (0.9.0.1)
>
> Hi Phil,
>
> Regarding identifying a rebalance, how about comparing array used for 
> consumer pause with current assignments of consumer?
>
> Regarding refresh after meta data refresh request, that will not 
> happen if you are committing after each record. I have Session time of 
> 30000 ms and if i commit last processed records before session time 
> out then everything is fine except after a meta data refresh request i 
> see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while 
> committing offsets"  on further commits from consumer till next poll. 
> This error means that even on committing on regular intervals (which 
> sends heartbeat) this somehow does not saves consumer from getting 
> timeout during a meta refresh. This issue does not happen if i am 
> committing after each record that is between 2-4 seconds or if a commit happens tight after meta refresh response.
>
> Regards,
> Vinay Sharma
>
>
> On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst < 
> phil.luckhurst@encycle.com
> > wrote:
>
> > Hi Vinay,
> >
> > I'm currently calling commitSync(Map<TopicPartition,
> > OffsetAndMetadata>
> > offsets) after each message to just write the partition offset for 
> > that specific message.  Our messages can take several seconds to 
> > process and this only seems to be adding 1 or 2 milliseconds to the 
> > time so is not looking like a significant overhead and is acting as 
> > our
> heartbeat.
> >
> > WRT " I see that kafka sends a metadata refresh request after every
> > 300000 ms (default) and even though nothing changed (no new 
> > consumer, broker or
> > partition) this refresh generally triggers a rebalance  (at least in 
> > my
> > tests) ."
> >
> > I'm not seeing this. We've got a ConsumerRebalanceListener 
> > implemented on our consumers and I don't see this this get called 
> > even though I see lots of metadata request being sent. We can also 
> > have quiet periods where we often exceed the 300000 ms refresh 
> > default and those metadata requests don't trigger a rebalance either.
> >
> > I'm calling consumer.pause(consumer.assignment().toArray(new
> > TopicPartition[0])) at the start of each batch and 
> > consumer.resume(consumer.assignment().toArray(new 
> > TopicPartition[0])) the end. This allows us to call poll(0) in the 
> > message loop if we need to block on a message for more than 
> > session.timeout.ms ( this can happen if an external system is 
> > temporarily unavailable). Again this seems to work ok and does not trigger a rebalance.
> >
> > The only issue we've found is as mentioned before where a rebalance 
> > occurs while we are processing a batch of messages. When that 
> > happens the commitSync fails with a KafkaException and the message 
> > states this is due to a rebalance. We'd like to skip the rest of the 
> > batch when this happens but to do that we'd need to know for sure 
> > that it was because of a rebalance but KafkaException could be 
> > called for other reasons. A KafkaRebalanceException or even a method 
> > we could call on the consumer would allow us to safely abort the 
> > current processing loop knowing that the remaining messages would be 
> > picked up by another consumer after the rebalance - that would stop us processing duplicates.
> >
> > Thanks
> > Phil Luckhurst
> >
> >
> > -----Original Message-----
> > From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> > Sent: 22 April 2016 14:24
> > To: users@kafka.apache.org
> > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > (0.9.0.1)
> >
> > Hi Phil,
> >
> > Regarding pause and resume,I have not tried this approach but i 
> > think this approach may not be feasible. If your consumer no longer 
> > has that partition assigned from which record being processed was 
> > fetched or even if partition is assigned again to consumer somehow 
> > you may still not be able to do this and see UNKNOWN_MEMBER_ID or 
> > error ILLEGAL_GENERATION_ID in logs.Let me know if this approach 
> > works for
> you. i will also try this then.
> >
> > Kafka never sends same record to 2 consumers in same consumer group.
> > If another consumer got the same record then it means that first 
> > consumer doesn't has that particular partition assigned to it anymore.
> > Even if you want to commit from first consumer you will not be able 
> > to and commit will throw exception. Even after increasing 
> > sessionTimeOut a rebalance can still occur. I see that kafka sends a 
> > metadata refresh request after every 300000 ms (default) and even 
> > though nothing changed (no new consumer, broker or
> > partition) this refresh generally triggers a rebalance  (atleast in 
> > my
> > tests) .
> >
> > Calling commitSync renews session and keeps consumer alive. 
> > commitSync is a blocking operation so you may not want to call it on 
> > each record processing. You can try calling commitSync(Offset) just 
> > before starting to process a record only if lets say 75% of 
> > configured session time is elapsed. This will keep your consumer 
> > alive during rare longer processing time and will also not commit 
> > each record. But as i said earlier this will not guarantee that 
> > rebalance will not happen. A metadata refresh or other change may 
> > still trigger rebalance but if you take above approach then atleast 
> > a rebalance will not occur because of session time out during a longer processing time.
> >
> > if you maintain offsets outside of kafka and configure consumers to 
> > coordinate with each other through these external offsets then you 
> > can skip processing duplicate records even if kafka sends same records twice.
> > Through these external offsets you will have to device a way to skip 
> > a record if already processed by another consumer or wait if same 
> > record is in process by another consumer.
> >
> >
> > Regards,
> > Vinay Sharma
> >
> >
> >
> > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst < 
> > phil.luckhurst@encycle.com>
> > wrote:
> >
> > > Thanks for all the responses. Unfortunately it seems that 
> > > currently there is no fool proof solution to this. It's not a 
> > > problem with the stored offsets as it will happen even if I do a 
> > > commitSync after each record is processed. It's the unprocessed 
> > > records in the batch that get processed twice.
> > >
> > > I'm now taking the approach of trying to limit the possibility of 
> > > a rebalance as much as possible by reducing the data returned by poll.
> > > I'm also using the pause, poll, resume pattern to ensure the 
> > > consumer doesn't cause a rebalance if the processing loop takes 
> > > longer than session.timeout.ms.
> > >
> > > Cheers,
> > > Phil
> > >
> > >
> > >
> > > On 21 Apr 2016 16:24, at 16:24, vinay sharma 
> > > <vi...@gmail.com>
> > > wrote:
> > > >Hi,
> > > >
> > > >By design Kafka does ensure not to send same record to multiple 
> > > >consumers in same consumer group. Issue is because of rebalance 
> > > >while a processing is going on and records are not yet commited. 
> > > >In my view there are only 2 possible solutions to it
> > > >1) As mentioned in documentation, store offsets outside of kafka 
> > > >(
> > > >
> > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/c
> > > li
> > > en
> > > ts/consumer/KafkaConsumer.html
> > > ).
> > > >This is a complete solution but will definitely add extra 
> > > >developement and also extra processing to each message. Problem 
> > > >may still exist if at the time of a crash consumer was out of 
> > > >sync from external custom offset storage and offsets stored in kafka both.
> > > >2)  As mentioned in fix for defect 919 (
> > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit 
> > > >to true.
> > > >This will make kafka commit fetched records before rebalancing.
> > > >Only drawback is that some records may never be processed if 
> > > >consumer crashes while processing records which are already 
> > > >marked committed due to rebalance.
> > > >
> > > >Regards,
> > > >Vinay Sharma
> > >
> >
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

CommitSync sends a heartbeat request on each call but it seems that somehow
it stops sending a heartbeat request after a meta refresh till next poll. I
asked about this on dev list and came to know that this is fixed in
0.10.0.0 which is next version. I heve not gone in to detail of defect but
it seems something is fixed related to time reset of hearbeat task so that
next heatbeat request time is calculated correctly. From next version
commitSync will act as heartbeat as per the defect.

Regards,
Vinay Sharma
On Apr 26, 2016 4:53 AM, "Phil Luckhurst" <ph...@encycle.com>
wrote:

> Hi Vinay,
>
> "Regarding identifying a rebalance, how about comparing array used for
> consumer pause with current assignments of consumer?"
>
> I did consider checking that in the commitSync exception handler but
> didn't try it because if this is in the consumer that has caused the
> rebalance (i.e. the one that appears to be dead) I didn't think its
> partition assignments would have been updated when handling the exception,
> the ConsumerRebalanceListener callbacks have not yet been called - I can
> give it a try though. That's why I thought having commitSync throw an
> explicit 'rebalance in progress' type exception rather than just a
> KafkaException would allow this to be easily identified and handled.
>
> The information about the metadata request is useful, I'll watch out for
> that if we change our commit logic.
>
> Thanks
> Phil Luckhurst
>
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 25 April 2016 20:30
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> Regarding identifying a rebalance, how about comparing array used for
> consumer pause with current assignments of consumer?
>
> Regarding refresh after meta data refresh request, that will not happen if
> you are committing after each record. I have Session time of 30000 ms and
> if i commit last processed records before session time out then everything
> is fine except after a meta data refresh request i see a rebalance which
> causes "Error UNKNOWN_MEMBER_ID occurred while committing offsets"  on
> further commits from consumer till next poll. This error means that even on
> committing on regular intervals (which sends heartbeat) this somehow does
> not saves consumer from getting timeout during a meta refresh. This issue
> does not happen if i am committing after each record that is between 2-4
> seconds or if a commit happens tight after meta refresh response.
>
> Regards,
> Vinay Sharma
>
>
> On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <
> phil.luckhurst@encycle.com
> > wrote:
>
> > Hi Vinay,
> >
> > I'm currently calling commitSync(Map<TopicPartition,
> > OffsetAndMetadata>
> > offsets) after each message to just write the partition offset for
> > that specific message.  Our messages can take several seconds to
> > process and this only seems to be adding 1 or 2 milliseconds to the
> > time so is not looking like a significant overhead and is acting as our
> heartbeat.
> >
> > WRT " I see that kafka sends a metadata refresh request after every
> > 300000 ms (default) and even though nothing changed (no new consumer,
> > broker or
> > partition) this refresh generally triggers a rebalance  (at least in
> > my
> > tests) ."
> >
> > I'm not seeing this. We've got a ConsumerRebalanceListener implemented
> > on our consumers and I don't see this this get called even though I
> > see lots of metadata request being sent. We can also have quiet
> > periods where we often exceed the 300000 ms refresh default and those
> > metadata requests don't trigger a rebalance either.
> >
> > I'm calling consumer.pause(consumer.assignment().toArray(new
> > TopicPartition[0])) at the start of each batch and
> > consumer.resume(consumer.assignment().toArray(new TopicPartition[0]))
> > the end. This allows us to call poll(0) in the message loop if we need
> > to block on a message for more than session.timeout.ms ( this can
> > happen if an external system is temporarily unavailable). Again this
> > seems to work ok and does not trigger a rebalance.
> >
> > The only issue we've found is as mentioned before where a rebalance
> > occurs while we are processing a batch of messages. When that happens
> > the commitSync fails with a KafkaException and the message states this
> > is due to a rebalance. We'd like to skip the rest of the batch when
> > this happens but to do that we'd need to know for sure that it was
> > because of a rebalance but KafkaException could be called for other
> > reasons. A KafkaRebalanceException or even a method we could call on
> > the consumer would allow us to safely abort the current processing
> > loop knowing that the remaining messages would be picked up by another
> > consumer after the rebalance - that would stop us processing duplicates.
> >
> > Thanks
> > Phil Luckhurst
> >
> >
> > -----Original Message-----
> > From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> > Sent: 22 April 2016 14:24
> > To: users@kafka.apache.org
> > Subject: Re: Detecting rebalance while processing ConsumerRecords
> > (0.9.0.1)
> >
> > Hi Phil,
> >
> > Regarding pause and resume,I have not tried this approach but i think
> > this approach may not be feasible. If your consumer no longer has that
> > partition assigned from which record being processed was fetched or
> > even if partition is assigned again to consumer somehow you may still
> > not be able to do this and see UNKNOWN_MEMBER_ID or error
> > ILLEGAL_GENERATION_ID in logs.Let me know if this approach works for
> you. i will also try this then.
> >
> > Kafka never sends same record to 2 consumers in same consumer group.
> > If another consumer got the same record then it means that first
> > consumer doesn't has that particular partition assigned to it anymore.
> > Even if you want to commit from first consumer you will not be able to
> > and commit will throw exception. Even after increasing sessionTimeOut
> > a rebalance can still occur. I see that kafka sends a metadata refresh
> > request after every 300000 ms (default) and even though nothing
> > changed (no new consumer, broker or
> > partition) this refresh generally triggers a rebalance  (atleast in my
> > tests) .
> >
> > Calling commitSync renews session and keeps consumer alive. commitSync
> > is a blocking operation so you may not want to call it on each record
> > processing. You can try calling commitSync(Offset) just before
> > starting to process a record only if lets say 75% of configured
> > session time is elapsed. This will keep your consumer alive during
> > rare longer processing time and will also not commit each record. But
> > as i said earlier this will not guarantee that rebalance will not
> > happen. A metadata refresh or other change may still trigger rebalance
> > but if you take above approach then atleast a rebalance will not occur
> > because of session time out during a longer processing time.
> >
> > if you maintain offsets outside of kafka and configure consumers to
> > coordinate with each other through these external offsets then you can
> > skip processing duplicate records even if kafka sends same records twice.
> > Through these external offsets you will have to device a way to skip a
> > record if already processed by another consumer or wait if same record
> > is in process by another consumer.
> >
> >
> > Regards,
> > Vinay Sharma
> >
> >
> >
> > On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <
> > phil.luckhurst@encycle.com>
> > wrote:
> >
> > > Thanks for all the responses. Unfortunately it seems that currently
> > > there is no fool proof solution to this. It's not a problem with the
> > > stored offsets as it will happen even if I do a commitSync after
> > > each record is processed. It's the unprocessed records in the batch
> > > that get processed twice.
> > >
> > > I'm now taking the approach of trying to limit the possibility of a
> > > rebalance as much as possible by reducing the data returned by poll.
> > > I'm also using the pause, poll, resume pattern to ensure the
> > > consumer doesn't cause a rebalance if the processing loop takes
> > > longer than session.timeout.ms.
> > >
> > > Cheers,
> > > Phil
> > >
> > >
> > >
> > > On 21 Apr 2016 16:24, at 16:24, vinay sharma
> > > <vi...@gmail.com>
> > > wrote:
> > > >Hi,
> > > >
> > > >By design Kafka does ensure not to send same record to multiple
> > > >consumers in same consumer group. Issue is because of rebalance
> > > >while a processing is going on and records are not yet commited. In
> > > >my view there are only 2 possible solutions to it
> > > >1) As mentioned in documentation, store offsets outside of kafka (
> > > >
> > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/cli
> > > en
> > > ts/consumer/KafkaConsumer.html
> > > ).
> > > >This is a complete solution but will definitely add extra
> > > >developement and also extra processing to each message. Problem may
> > > >still exist if at the time of a crash consumer was out of sync from
> > > >external custom offset storage and offsets stored in kafka both.
> > > >2)  As mentioned in fix for defect 919 (
> > > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
> > > >true.
> > > >This will make kafka commit fetched records before rebalancing.
> > > >Only drawback is that some records may never be processed if
> > > >consumer crashes while processing records which are already marked
> > > >committed due to rebalance.
> > > >
> > > >Regards,
> > > >Vinay Sharma
> > >
> >
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Hi Vinay,

"Regarding identifying a rebalance, how about comparing array used for consumer pause with current assignments of consumer?"

I did consider checking that in the commitSync exception handler but didn't try it because if this is in the consumer that has caused the rebalance (i.e. the one that appears to be dead) I didn't think its partition assignments would have been updated when handling the exception, the ConsumerRebalanceListener callbacks have not yet been called - I can give it a try though. That's why I thought having commitSync throw an explicit 'rebalance in progress' type exception rather than just a KafkaException would allow this to be easily identified and handled.

The information about the metadata request is useful, I'll watch out for that if we change our commit logic.

Thanks
Phil Luckhurst


-----Original Message-----
From: vinay sharma [mailto:vinsharma.tech@gmail.com] 
Sent: 25 April 2016 20:30
To: users@kafka.apache.org
Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

Regarding identifying a rebalance, how about comparing array used for consumer pause with current assignments of consumer?

Regarding refresh after meta data refresh request, that will not happen if you are committing after each record. I have Session time of 30000 ms and if i commit last processed records before session time out then everything is fine except after a meta data refresh request i see a rebalance which causes "Error UNKNOWN_MEMBER_ID occurred while committing offsets"  on further commits from consumer till next poll. This error means that even on committing on regular intervals (which sends heartbeat) this somehow does not saves consumer from getting timeout during a meta refresh. This issue does not happen if i am committing after each record that is between 2-4 seconds or if a commit happens tight after meta refresh response.

Regards,
Vinay Sharma


On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <phil.luckhurst@encycle.com
> wrote:

> Hi Vinay,
>
> I'm currently calling commitSync(Map<TopicPartition, 
> OffsetAndMetadata>
> offsets) after each message to just write the partition offset for 
> that specific message.  Our messages can take several seconds to 
> process and this only seems to be adding 1 or 2 milliseconds to the 
> time so is not looking like a significant overhead and is acting as our heartbeat.
>
> WRT " I see that kafka sends a metadata refresh request after every 
> 300000 ms (default) and even though nothing changed (no new consumer, 
> broker or
> partition) this refresh generally triggers a rebalance  (at least in 
> my
> tests) ."
>
> I'm not seeing this. We've got a ConsumerRebalanceListener implemented 
> on our consumers and I don't see this this get called even though I 
> see lots of metadata request being sent. We can also have quiet 
> periods where we often exceed the 300000 ms refresh default and those 
> metadata requests don't trigger a rebalance either.
>
> I'm calling consumer.pause(consumer.assignment().toArray(new
> TopicPartition[0])) at the start of each batch and 
> consumer.resume(consumer.assignment().toArray(new TopicPartition[0])) 
> the end. This allows us to call poll(0) in the message loop if we need 
> to block on a message for more than session.timeout.ms ( this can 
> happen if an external system is temporarily unavailable). Again this 
> seems to work ok and does not trigger a rebalance.
>
> The only issue we've found is as mentioned before where a rebalance 
> occurs while we are processing a batch of messages. When that happens 
> the commitSync fails with a KafkaException and the message states this 
> is due to a rebalance. We'd like to skip the rest of the batch when 
> this happens but to do that we'd need to know for sure that it was 
> because of a rebalance but KafkaException could be called for other 
> reasons. A KafkaRebalanceException or even a method we could call on 
> the consumer would allow us to safely abort the current processing 
> loop knowing that the remaining messages would be picked up by another 
> consumer after the rebalance - that would stop us processing duplicates.
>
> Thanks
> Phil Luckhurst
>
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 22 April 2016 14:24
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords 
> (0.9.0.1)
>
> Hi Phil,
>
> Regarding pause and resume,I have not tried this approach but i think 
> this approach may not be feasible. If your consumer no longer has that 
> partition assigned from which record being processed was fetched or 
> even if partition is assigned again to consumer somehow you may still 
> not be able to do this and see UNKNOWN_MEMBER_ID or error 
> ILLEGAL_GENERATION_ID in logs.Let me know if this approach works for you. i will also try this then.
>
> Kafka never sends same record to 2 consumers in same consumer group. 
> If another consumer got the same record then it means that first 
> consumer doesn't has that particular partition assigned to it anymore. 
> Even if you want to commit from first consumer you will not be able to 
> and commit will throw exception. Even after increasing sessionTimeOut 
> a rebalance can still occur. I see that kafka sends a metadata refresh 
> request after every 300000 ms (default) and even though nothing 
> changed (no new consumer, broker or
> partition) this refresh generally triggers a rebalance  (atleast in my
> tests) .
>
> Calling commitSync renews session and keeps consumer alive. commitSync 
> is a blocking operation so you may not want to call it on each record 
> processing. You can try calling commitSync(Offset) just before 
> starting to process a record only if lets say 75% of configured 
> session time is elapsed. This will keep your consumer alive during 
> rare longer processing time and will also not commit each record. But 
> as i said earlier this will not guarantee that rebalance will not 
> happen. A metadata refresh or other change may still trigger rebalance 
> but if you take above approach then atleast a rebalance will not occur 
> because of session time out during a longer processing time.
>
> if you maintain offsets outside of kafka and configure consumers to 
> coordinate with each other through these external offsets then you can 
> skip processing duplicate records even if kafka sends same records twice.
> Through these external offsets you will have to device a way to skip a 
> record if already processed by another consumer or wait if same record 
> is in process by another consumer.
>
>
> Regards,
> Vinay Sharma
>
>
>
> On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst < 
> phil.luckhurst@encycle.com>
> wrote:
>
> > Thanks for all the responses. Unfortunately it seems that currently 
> > there is no fool proof solution to this. It's not a problem with the 
> > stored offsets as it will happen even if I do a commitSync after 
> > each record is processed. It's the unprocessed records in the batch 
> > that get processed twice.
> >
> > I'm now taking the approach of trying to limit the possibility of a 
> > rebalance as much as possible by reducing the data returned by poll.
> > I'm also using the pause, poll, resume pattern to ensure the 
> > consumer doesn't cause a rebalance if the processing loop takes 
> > longer than session.timeout.ms.
> >
> > Cheers,
> > Phil
> >
> >
> >
> > On 21 Apr 2016 16:24, at 16:24, vinay sharma 
> > <vi...@gmail.com>
> > wrote:
> > >Hi,
> > >
> > >By design Kafka does ensure not to send same record to multiple 
> > >consumers in same consumer group. Issue is because of rebalance 
> > >while a processing is going on and records are not yet commited. In 
> > >my view there are only 2 possible solutions to it
> > >1) As mentioned in documentation, store offsets outside of kafka (
> > >
> > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/cli
> > en
> > ts/consumer/KafkaConsumer.html
> > ).
> > >This is a complete solution but will definitely add extra 
> > >developement and also extra processing to each message. Problem may 
> > >still exist if at the time of a crash consumer was out of sync from 
> > >external custom offset storage and offsets stored in kafka both.
> > >2)  As mentioned in fix for defect 919 (
> > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to 
> > >true.
> > >This will make kafka commit fetched records before rebalancing. 
> > >Only drawback is that some records may never be processed if 
> > >consumer crashes while processing records which are already marked 
> > >committed due to rebalance.
> > >
> > >Regards,
> > >Vinay Sharma
> >
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

Regarding identifying a rebalance, how about comparing array used for
consumer pause with current assignments of consumer?

Regarding refresh after meta data refresh request, that will not happen if
you are committing after each record. I have Session time of 30000 ms and
if i commit last processed records before session time out then everything
is fine except after a meta data refresh request i see a rebalance which
causes "Error UNKNOWN_MEMBER_ID occurred while committing offsets"  on
further commits from consumer till next poll. This error means that even on
committing on regular intervals (which sends heartbeat) this somehow does
not saves consumer from getting timeout during a meta refresh. This issue
does not happen if i am committing after each record that is between 2-4
seconds or if a commit happens tight after meta refresh response.

Regards,
Vinay Sharma


On Mon, Apr 25, 2016 at 11:27 AM, Phil Luckhurst <phil.luckhurst@encycle.com
> wrote:

> Hi Vinay,
>
> I'm currently calling commitSync(Map<TopicPartition, OffsetAndMetadata>
> offsets) after each message to just write the partition offset for that
> specific message.  Our messages can take several seconds to process and
> this only seems to be adding 1 or 2 milliseconds to the time so is not
> looking like a significant overhead and is acting as our heartbeat.
>
> WRT " I see that kafka sends a metadata refresh request after every 300000
> ms (default) and even though nothing changed (no new consumer, broker or
> partition) this refresh generally triggers a rebalance  (at least in my
> tests) ."
>
> I'm not seeing this. We've got a ConsumerRebalanceListener implemented on
> our consumers and I don't see this this get called even though I see lots
> of metadata request being sent. We can also have quiet periods where we
> often exceed the 300000 ms refresh default and those metadata requests
> don't trigger a rebalance either.
>
> I'm calling consumer.pause(consumer.assignment().toArray(new
> TopicPartition[0])) at the start of each batch and
> consumer.resume(consumer.assignment().toArray(new TopicPartition[0])) the
> end. This allows us to call poll(0) in the message loop if we need to block
> on a message for more than session.timeout.ms ( this can happen if an
> external system is temporarily unavailable). Again this seems to work ok
> and does not trigger a rebalance.
>
> The only issue we've found is as mentioned before where a rebalance occurs
> while we are processing a batch of messages. When that happens the
> commitSync fails with a KafkaException and the message states this is due
> to a rebalance. We'd like to skip the rest of the batch when this happens
> but to do that we'd need to know for sure that it was because of a
> rebalance but KafkaException could be called for other reasons. A
> KafkaRebalanceException or even a method we could call on the consumer
> would allow us to safely abort the current processing loop knowing that the
> remaining messages would be picked up by another consumer after the
> rebalance - that would stop us processing duplicates.
>
> Thanks
> Phil Luckhurst
>
>
> -----Original Message-----
> From: vinay sharma [mailto:vinsharma.tech@gmail.com]
> Sent: 22 April 2016 14:24
> To: users@kafka.apache.org
> Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)
>
> Hi Phil,
>
> Regarding pause and resume,I have not tried this approach but i think this
> approach may not be feasible. If your consumer no longer has that partition
> assigned from which record being processed was fetched or even if partition
> is assigned again to consumer somehow you may still not be able to do this
> and see UNKNOWN_MEMBER_ID or error ILLEGAL_GENERATION_ID in logs.Let me
> know if this approach works for you. i will also try this then.
>
> Kafka never sends same record to 2 consumers in same consumer group. If
> another consumer got the same record then it means that first consumer
> doesn't has that particular partition assigned to it anymore. Even if you
> want to commit from first consumer you will not be able to and commit will
> throw exception. Even after increasing sessionTimeOut a rebalance can still
> occur. I see that kafka sends a metadata refresh request after every 300000
> ms (default) and even though nothing changed (no new consumer, broker or
> partition) this refresh generally triggers a rebalance  (atleast in my
> tests) .
>
> Calling commitSync renews session and keeps consumer alive. commitSync is
> a blocking operation so you may not want to call it on each record
> processing. You can try calling commitSync(Offset) just before starting to
> process a record only if lets say 75% of configured session time is
> elapsed. This will keep your consumer alive during rare longer processing
> time and will also not commit each record. But as i said earlier this will
> not guarantee that rebalance will not happen. A metadata refresh or other
> change may still trigger rebalance but if you take above approach then
> atleast a rebalance will not occur because of session time out during a
> longer processing time.
>
> if you maintain offsets outside of kafka and configure consumers to
> coordinate with each other through these external offsets then you can skip
> processing duplicate records even if kafka sends same records twice.
> Through these external offsets you will have to device a way to skip a
> record if already processed by another consumer or wait if same record is
> in process by another consumer.
>
>
> Regards,
> Vinay Sharma
>
>
>
> On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <
> phil.luckhurst@encycle.com>
> wrote:
>
> > Thanks for all the responses. Unfortunately it seems that currently
> > there is no fool proof solution to this. It's not a problem with the
> > stored offsets as it will happen even if I do a commitSync after each
> > record is processed. It's the unprocessed records in the batch that
> > get processed twice.
> >
> > I'm now taking the approach of trying to limit the possibility of a
> > rebalance as much as possible by reducing the data returned by poll.
> > I'm also using the pause, poll, resume pattern to ensure the consumer
> > doesn't cause a rebalance if the processing loop takes longer than
> > session.timeout.ms.
> >
> > Cheers,
> > Phil
> >
> >
> >
> > On 21 Apr 2016 16:24, at 16:24, vinay sharma
> > <vi...@gmail.com>
> > wrote:
> > >Hi,
> > >
> > >By design Kafka does ensure not to send same record to multiple
> > >consumers in same consumer group. Issue is because of rebalance while
> > >a processing is going on and records are not yet commited. In my view
> > >there are only 2 possible solutions to it
> > >1) As mentioned in documentation, store offsets outside of kafka (
> > >
> > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clien
> > ts/consumer/KafkaConsumer.html
> > ).
> > >This is a complete solution but will definitely add extra
> > >developement and also extra processing to each message. Problem may
> > >still exist if at the time of a crash consumer was out of sync from
> > >external custom offset storage and offsets stored in kafka both.
> > >2)  As mentioned in fix for defect 919 (
> > >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
> > >true.
> > >This will make kafka commit fetched records before rebalancing. Only
> > >drawback is that some records may never be processed if consumer
> > >crashes while processing records which are already marked committed
> > >due to rebalance.
> > >
> > >Regards,
> > >Vinay Sharma
> >
>

RE: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Hi Vinay,

I'm currently calling commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) after each message to just write the partition offset for that specific message.  Our messages can take several seconds to process and this only seems to be adding 1 or 2 milliseconds to the time so is not looking like a significant overhead and is acting as our heartbeat.

WRT " I see that kafka sends a metadata refresh request after every 300000 ms (default) and even though nothing changed (no new consumer, broker or partition) this refresh generally triggers a rebalance  (at least in my tests) ."
 
I'm not seeing this. We've got a ConsumerRebalanceListener implemented on our consumers and I don't see this this get called even though I see lots of metadata request being sent. We can also have quiet periods where we often exceed the 300000 ms refresh default and those metadata requests don't trigger a rebalance either. 

I'm calling consumer.pause(consumer.assignment().toArray(new TopicPartition[0])) at the start of each batch and consumer.resume(consumer.assignment().toArray(new TopicPartition[0])) the end. This allows us to call poll(0) in the message loop if we need to block on a message for more than session.timeout.ms ( this can happen if an external system is temporarily unavailable). Again this seems to work ok and does not trigger a rebalance.

The only issue we've found is as mentioned before where a rebalance occurs while we are processing a batch of messages. When that happens the commitSync fails with a KafkaException and the message states this is due to a rebalance. We'd like to skip the rest of the batch when this happens but to do that we'd need to know for sure that it was because of a rebalance but KafkaException could be called for other reasons. A KafkaRebalanceException or even a method we could call on the consumer would allow us to safely abort the current processing loop knowing that the remaining messages would be picked up by another consumer after the rebalance - that would stop us processing duplicates.

Thanks
Phil Luckhurst


-----Original Message-----
From: vinay sharma [mailto:vinsharma.tech@gmail.com] 
Sent: 22 April 2016 14:24
To: users@kafka.apache.org
Subject: Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Hi Phil,

Regarding pause and resume,I have not tried this approach but i think this approach may not be feasible. If your consumer no longer has that partition assigned from which record being processed was fetched or even if partition is assigned again to consumer somehow you may still not be able to do this and see UNKNOWN_MEMBER_ID or error ILLEGAL_GENERATION_ID in logs.Let me know if this approach works for you. i will also try this then.

Kafka never sends same record to 2 consumers in same consumer group. If another consumer got the same record then it means that first consumer doesn't has that particular partition assigned to it anymore. Even if you want to commit from first consumer you will not be able to and commit will throw exception. Even after increasing sessionTimeOut a rebalance can still occur. I see that kafka sends a metadata refresh request after every 300000 ms (default) and even though nothing changed (no new consumer, broker or
partition) this refresh generally triggers a rebalance  (atleast in my
tests) .

Calling commitSync renews session and keeps consumer alive. commitSync is a blocking operation so you may not want to call it on each record processing. You can try calling commitSync(Offset) just before starting to process a record only if lets say 75% of configured session time is elapsed. This will keep your consumer alive during rare longer processing time and will also not commit each record. But as i said earlier this will not guarantee that rebalance will not happen. A metadata refresh or other change may still trigger rebalance but if you take above approach then atleast a rebalance will not occur because of session time out during a longer processing time.

if you maintain offsets outside of kafka and configure consumers to coordinate with each other through these external offsets then you can skip processing duplicate records even if kafka sends same records twice.
Through these external offsets you will have to device a way to skip a record if already processed by another consumer or wait if same record is in process by another consumer.


Regards,
Vinay Sharma



On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <ph...@encycle.com>
wrote:

> Thanks for all the responses. Unfortunately it seems that currently 
> there is no fool proof solution to this. It's not a problem with the 
> stored offsets as it will happen even if I do a commitSync after each 
> record is processed. It's the unprocessed records in the batch that 
> get processed twice.
>
> I'm now taking the approach of trying to limit the possibility of a 
> rebalance as much as possible by reducing the data returned by poll. 
> I'm also using the pause, poll, resume pattern to ensure the consumer 
> doesn't cause a rebalance if the processing loop takes longer than 
> session.timeout.ms.
>
> Cheers,
> Phil
>
>
>
> On 21 Apr 2016 16:24, at 16:24, vinay sharma 
> <vi...@gmail.com>
> wrote:
> >Hi,
> >
> >By design Kafka does ensure not to send same record to multiple 
> >consumers in same consumer group. Issue is because of rebalance while 
> >a processing is going on and records are not yet commited. In my view 
> >there are only 2 possible solutions to it
> >1) As mentioned in documentation, store offsets outside of kafka (
> >
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clien
> ts/consumer/KafkaConsumer.html
> ).
> >This is a complete solution but will definitely add extra 
> >developement and also extra processing to each message. Problem may 
> >still exist if at the time of a crash consumer was out of sync from 
> >external custom offset storage and offsets stored in kafka both.
> >2)  As mentioned in fix for defect 919 (
> >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to 
> >true.
> >This will make kafka commit fetched records before rebalancing. Only 
> >drawback is that some records may never be processed if consumer 
> >crashes while processing records which are already marked committed 
> >due to rebalance.
> >
> >Regards,
> >Vinay Sharma
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi Phil,

Regarding pause and resume,I have not tried this approach but i think this
approach may not be feasible. If your consumer no longer has that partition
assigned from which record being processed was fetched or even if partition
is assigned again to consumer somehow you may still not be able to do this
and see UNKNOWN_MEMBER_ID or error ILLEGAL_GENERATION_ID in logs.Let me
know if this approach works for you. i will also try this then.

Kafka never sends same record to 2 consumers in same consumer group. If
another consumer got the same record then it means that first consumer
doesn't has that particular partition assigned to it anymore. Even if you
want to commit from first consumer you will not be able to and commit will
throw exception. Even after increasing sessionTimeOut a rebalance can still
occur. I see that kafka sends a metadata refresh request after every 300000
ms (default) and even though nothing changed (no new consumer, broker or
partition) this refresh generally triggers a rebalance  (atleast in my
tests) .

Calling commitSync renews session and keeps consumer alive. commitSync is a
blocking operation so you may not want to call it on each record
processing. You can try calling commitSync(Offset) just before starting to
process a record only if lets say 75% of configured session time is
elapsed. This will keep your consumer alive during rare longer processing
time and will also not commit each record. But as i said earlier this will
not guarantee that rebalance will not happen. A metadata refresh or other
change may still trigger rebalance but if you take above approach then
atleast a rebalance will not occur because of session time out during a
longer processing time.

if you maintain offsets outside of kafka and configure consumers to
coordinate with each other through these external offsets then you can skip
processing duplicate records even if kafka sends same records twice.
Through these external offsets you will have to device a way to skip a
record if already processed by another consumer or wait if same record is
in process by another consumer.


Regards,
Vinay Sharma



On Thu, Apr 21, 2016 at 2:09 PM, Phil Luckhurst <ph...@encycle.com>
wrote:

> Thanks for all the responses. Unfortunately it seems that currently there
> is no fool proof solution to this. It's not a problem with the stored
> offsets as it will happen even if I do a commitSync after each record is
> processed. It's the unprocessed records in the batch that get processed
> twice.
>
> I'm now taking the approach of trying to limit the possibility of a
> rebalance as much as possible by reducing the data returned by poll. I'm
> also using the pause, poll, resume pattern to ensure the consumer doesn't
> cause a rebalance if the processing loop takes longer than
> session.timeout.ms.
>
> Cheers,
> Phil
>
>
>
> On 21 Apr 2016 16:24, at 16:24, vinay sharma <vi...@gmail.com>
> wrote:
> >Hi,
> >
> >By design Kafka does ensure not to send same record to multiple
> >consumers
> >in same consumer group. Issue is because of rebalance while a
> >processing is
> >going on and records are not yet commited. In my view there are only 2
> >possible solutions to it
> >1) As mentioned in documentation, store offsets outside of kafka (
> >
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> ).
> >This is a complete solution but will definitely add extra developement
> >and
> >also extra processing to each message. Problem may still exist if at
> >the
> >time of a crash consumer was out of sync from external custom offset
> >storage and offsets stored in kafka both.
> >2)  As mentioned in fix for defect 919 (
> >https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
> >true.
> >This will make kafka commit fetched records before rebalancing. Only
> >drawback is that some records may never be processed if consumer
> >crashes
> >while processing records which are already marked committed due to
> >rebalance.
> >
> >Regards,
> >Vinay Sharma
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Phil Luckhurst <ph...@encycle.com>.
Thanks for all the responses. Unfortunately it seems that currently there is no fool proof solution to this. It's not a problem with the stored offsets as it will happen even if I do a commitSync after each record is processed. It's the unprocessed records in the batch that get processed twice.

I'm now taking the approach of trying to limit the possibility of a rebalance as much as possible by reducing the data returned by poll. I'm also using the pause, poll, resume pattern to ensure the consumer doesn't cause a rebalance if the processing loop takes longer than session.timeout.ms.

Cheers,
Phil



On 21 Apr 2016 16:24, at 16:24, vinay sharma <vi...@gmail.com> wrote:
>Hi,
>
>By design Kafka does ensure not to send same record to multiple
>consumers
>in same consumer group. Issue is because of rebalance while a
>processing is
>going on and records are not yet commited. In my view there are only 2
>possible solutions to it
>1) As mentioned in documentation, store offsets outside of kafka (
>https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html).
>This is a complete solution but will definitely add extra developement
>and
>also extra processing to each message. Problem may still exist if at
>the
>time of a crash consumer was out of sync from external custom offset
>storage and offsets stored in kafka both.
>2)  As mentioned in fix for defect 919 (
>https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to
>true.
>This will make kafka commit fetched records before rebalancing. Only
>drawback is that some records may never be processed if consumer
>crashes
>while processing records which are already marked committed due to
>rebalance.
>
>Regards,
>Vinay Sharma

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
Hi,

By design Kafka does ensure not to send same record to multiple consumers
in same consumer group. Issue is because of rebalance while a processing is
going on and records are not yet commited. In my view there are only 2
possible solutions to it
1) As mentioned in documentation, store offsets outside of kafka (
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html).
This is a complete solution but will definitely add extra developement and
also extra processing to each message. Problem may still exist if at the
time of a crash consumer was out of sync from external custom offset
storage and offsets stored in kafka both.
2)  As mentioned in fix for defect 919 (
https://issues.apache.org/jira/browse/KAFKA-919) set autocommit to true.
This will make kafka commit fetched records before rebalancing. Only
drawback is that some records may never be processed if consumer crashes
while processing records which are already marked committed due to
rebalance.

Regards,
Vinay Sharma

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by Tom Crayford <tc...@heroku.com>.
Note that Kafka is not designed to prevent duplicate records anyway. For
example, if your app writes into an external system (for example a
database) once per consumer record, and you do synchronous offset commit
after every consumer record, you can still have duplicate messages. Here's
the case worked through:

Usually your consumer will be going through a loop of:

Iterate over message from poll()
  Write to external system
  Commit offset of message

If a crash of the consumer or a rebalance happens between the write to the
external system and the offset request, you're always going to receive
duplicate writes to your external system, no matter what. The only real
solution is to make your writes to your external system accepting of
duplicates (aka they have to be idempotent).

On Thu, Apr 21, 2016 at 3:49 PM, vinay sharma <vi...@gmail.com>
wrote:

> regarding pause and resume approach, I think there will still be a chance
> that you end up processing duplicate records. Rebalance can still get
> triggered due to numerous reasons while you are processing records.
>
> On Thu, Apr 21, 2016 at 10:34 AM, vinay sharma <vi...@gmail.com>
> wrote:
>
> > I was also struggling with this problem. I have found one way to do it
> > without making consumers aware of each others processing or assignment
> > state. You can set autocommit to true. Irrespective of autocommit
> interval
> > setting autocommit true will make kafka commit all records already sent
> to
> > consumers.
> >
> > On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst <
> > phil.luckhurst@encycle.com> wrote:
> >
> >> This is an example of  the scenario I'm trying avoid where 2 consumers
> >> end up processing the same records from a partition at the same time.
> >>
> >>
> >> 1.       I have a topic with 2 partitions and two consumers A and B
> which
> >> have each been assigned a partition from the topic.
> >>
> >> 2.       Consumer A is processing a batch of 100 records from partition
> 1
> >> and it takes much longer than expected so after 20 records the consumer
> >> appears dead so a rebalance occurs.
> >>
> >> 3.       Consumer B now gets assigned both partitions and starts
> >> processing partition 1 from the last committed offset by consumer A.
> >>
> >> 4.       Consumer A which, unknown to it, was the cause of the rebalance
> >> is still processing the remaining records in its batch of 100 which are
> the
> >> same records that Consumer B is also now processing from the same
> partition.
> >>
> >> Is there a way that I can detect in the ConsumerRecords loop of Consumer
> >> A that it has been marked dead and should skip the remaining records in
> the
> >> batch and do the next poll() which will cause it to trigger another
> >> rebalance and rejoin the group? I have added a ConsumerRebalanceListener
> >> but as onPartitionsRevoked() only gets called when poll() is called I
> don't
> >> get notified until all the records from the current poll batch have been
> >> processed.
> >>
> >> This shows where I think I need the check.
> >>
> >> while (true) {
> >>                 ConsumerRecords<String, String> records =
> >> consumer.poll(100);
> >>                 records.forEach((ConsumerRecord<String, String> crs) ->
> {
> >>                                 //  I think I need a check here to jump
> >> out the loop and call poll() again?
> >>                                 If (consumer.isDead())
> >>                                                 continue;
> >>
> >>                 }
> >> }
> >>
> >> I've looked at the suggestions for using pause and resume so that poll()
> >> can be called during the record processing loop which might allow me to
> do
> >> it like this?
> >>
> >>
> >> 1.       Call pause at the start of the loop.
> >>
> >> 2.       Call poll(0) in the loop which will trigger the call to
> >> ConsumerRebalanceListener onPartitionsRevoked() and perform the
> rebalance.
> >>
> >> 3.       If ConsumerRebalanceListener onPartitionsRevoked() was called
> >> then I would call resume and break out of the record processing loop so
> the
> >> main poll() request is called again.
> >>
> >> 4.       Call resume at the end of the record processing loop.
> >>
> >> Is that a viable solution to the problem or is there a better way to do
> >> this?
> >>
> >> Thanks
> >> Phil Luckhurst
> >>
> >
> >
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
regarding pause and resume approach, I think there will still be a chance
that you end up processing duplicate records. Rebalance can still get
triggered due to numerous reasons while you are processing records.

On Thu, Apr 21, 2016 at 10:34 AM, vinay sharma <vi...@gmail.com>
wrote:

> I was also struggling with this problem. I have found one way to do it
> without making consumers aware of each others processing or assignment
> state. You can set autocommit to true. Irrespective of autocommit interval
> setting autocommit true will make kafka commit all records already sent to
> consumers.
>
> On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst <
> phil.luckhurst@encycle.com> wrote:
>
>> This is an example of  the scenario I'm trying avoid where 2 consumers
>> end up processing the same records from a partition at the same time.
>>
>>
>> 1.       I have a topic with 2 partitions and two consumers A and B which
>> have each been assigned a partition from the topic.
>>
>> 2.       Consumer A is processing a batch of 100 records from partition 1
>> and it takes much longer than expected so after 20 records the consumer
>> appears dead so a rebalance occurs.
>>
>> 3.       Consumer B now gets assigned both partitions and starts
>> processing partition 1 from the last committed offset by consumer A.
>>
>> 4.       Consumer A which, unknown to it, was the cause of the rebalance
>> is still processing the remaining records in its batch of 100 which are the
>> same records that Consumer B is also now processing from the same partition.
>>
>> Is there a way that I can detect in the ConsumerRecords loop of Consumer
>> A that it has been marked dead and should skip the remaining records in the
>> batch and do the next poll() which will cause it to trigger another
>> rebalance and rejoin the group? I have added a ConsumerRebalanceListener
>> but as onPartitionsRevoked() only gets called when poll() is called I don't
>> get notified until all the records from the current poll batch have been
>> processed.
>>
>> This shows where I think I need the check.
>>
>> while (true) {
>>                 ConsumerRecords<String, String> records =
>> consumer.poll(100);
>>                 records.forEach((ConsumerRecord<String, String> crs) -> {
>>                                 //  I think I need a check here to jump
>> out the loop and call poll() again?
>>                                 If (consumer.isDead())
>>                                                 continue;
>>
>>                 }
>> }
>>
>> I've looked at the suggestions for using pause and resume so that poll()
>> can be called during the record processing loop which might allow me to do
>> it like this?
>>
>>
>> 1.       Call pause at the start of the loop.
>>
>> 2.       Call poll(0) in the loop which will trigger the call to
>> ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance.
>>
>> 3.       If ConsumerRebalanceListener onPartitionsRevoked() was called
>> then I would call resume and break out of the record processing loop so the
>> main poll() request is called again.
>>
>> 4.       Call resume at the end of the record processing loop.
>>
>> Is that a viable solution to the problem or is there a better way to do
>> this?
>>
>> Thanks
>> Phil Luckhurst
>>
>
>

Re: Detecting rebalance while processing ConsumerRecords (0.9.0.1)

Posted by vinay sharma <vi...@gmail.com>.
I was also struggling with this problem. I have found one way to do it
without making consumers aware of each others processing or assignment
state. You can set autocommit to true. Irrespective of autocommit interval
setting autocommit true will make kafka commit all records already sent to
consumers.

On Thu, Apr 21, 2016 at 5:18 AM, Phil Luckhurst <ph...@encycle.com>
wrote:

> This is an example of  the scenario I'm trying avoid where 2 consumers end
> up processing the same records from a partition at the same time.
>
>
> 1.       I have a topic with 2 partitions and two consumers A and B which
> have each been assigned a partition from the topic.
>
> 2.       Consumer A is processing a batch of 100 records from partition 1
> and it takes much longer than expected so after 20 records the consumer
> appears dead so a rebalance occurs.
>
> 3.       Consumer B now gets assigned both partitions and starts
> processing partition 1 from the last committed offset by consumer A.
>
> 4.       Consumer A which, unknown to it, was the cause of the rebalance
> is still processing the remaining records in its batch of 100 which are the
> same records that Consumer B is also now processing from the same partition.
>
> Is there a way that I can detect in the ConsumerRecords loop of Consumer A
> that it has been marked dead and should skip the remaining records in the
> batch and do the next poll() which will cause it to trigger another
> rebalance and rejoin the group? I have added a ConsumerRebalanceListener
> but as onPartitionsRevoked() only gets called when poll() is called I don't
> get notified until all the records from the current poll batch have been
> processed.
>
> This shows where I think I need the check.
>
> while (true) {
>                 ConsumerRecords<String, String> records =
> consumer.poll(100);
>                 records.forEach((ConsumerRecord<String, String> crs) -> {
>                                 //  I think I need a check here to jump
> out the loop and call poll() again?
>                                 If (consumer.isDead())
>                                                 continue;
>
>                 }
> }
>
> I've looked at the suggestions for using pause and resume so that poll()
> can be called during the record processing loop which might allow me to do
> it like this?
>
>
> 1.       Call pause at the start of the loop.
>
> 2.       Call poll(0) in the loop which will trigger the call to
> ConsumerRebalanceListener onPartitionsRevoked() and perform the rebalance.
>
> 3.       If ConsumerRebalanceListener onPartitionsRevoked() was called
> then I would call resume and break out of the record processing loop so the
> main poll() request is called again.
>
> 4.       Call resume at the end of the record processing loop.
>
> Is that a viable solution to the problem or is there a better way to do
> this?
>
> Thanks
> Phil Luckhurst
>