You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by za...@email.cz on 2016/08/21 12:44:43 UTC

Consumer failure and producer topic deduplication

Hi,




I am using Kafka for a fairly specific use case with a need for a lot of 
control. I use the new KafkaConsumer to consume events from input topic, 
process the data and publish to multiple different topics. The use case 
requires fairly strict delivery and consistency guarantees. All messages 
must be published to the resulting topics "exactly once". I am using 
ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the 
message is published, but the consumer dies before committing the offset 
which would normally result in a at least once delivery guarantee and 
therefore possibly duplicated messages, because after a rebalance new 
consumer in the group would publish the same uncommited offset again.




Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to 
perform correcting actions during group rebalance after a failure.  I am 
using a SimpleConsumer (for granular control and ability to read/commit 
using the original consumer group) to find these issues and publish the 
partially processed records as necessary and most importantly commit the 
offset to avoid duplicates if required. The result of 'simpleConsumer.
commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is 
'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID 
error.




Can anyone please explain why I might be getting this error and how to avoid
it in my scenario?




Thank you,

Martin

RE: Consumer failure and producer topic deduplication

Posted by Martin Gainty <mg...@hotmail.com>.
here is a scala test which produces UNKNOWN_MEMBER_ID
 @Test  def testJoinGroupUnknownConsumerNewGroup() {    val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)    val joinGroupErrorCode = joinGroupResult.errorCode    assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)  }
an attribute of supplied group trait (trait is like java class) is either null or invalid..log the groupId, memberId and verify e.g.
log.info("Logging GroupId %d MemberId %d".format(groupId, memberId))

Martin



> From: zapletal-martin@email.cz
> To: users@kafka.apache.org
> Subject: Consumer failure and producer topic deduplication
> Date: Sun, 21 Aug 2016 14:44:43 +0200
> 
> 
> Hi,
> 
> 
> 
> 
> I am using Kafka for a fairly specific use case with a need for a lot of 
> control. I use the new KafkaConsumer to consume events from input topic, 
> process the data and publish to multiple different topics. The use case 
> requires fairly strict delivery and consistency guarantees. All messages 
> must be published to the resulting topics "exactly once". I am using 
> ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the 
> message is published, but the consumer dies before committing the offset 
> which would normally result in a at least once delivery guarantee and 
> therefore possibly duplicated messages, because after a rebalance new 
> consumer in the group would publish the same uncommited offset again.
> 
> 
> 
> 
> Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to 
> perform correcting actions during group rebalance after a failure.  I am 
> using a SimpleConsumer (for granular control and ability to read/commit 
> using the original consumer group) to find these issues and publish the 
> partially processed records as necessary and most importantly commit the 
> offset to avoid duplicates if required. The result of 'simpleConsumer.
> commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is 
> 'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID 
> error.
> 
> 
> 
> 
> Can anyone please explain why I might be getting this error and how to avoid
> it in my scenario?
> 
> 
> 
> 
> Thank you,
> 
> Martin
 		 	   		  

Re: Consumer failure and producer topic deduplication

Posted by za...@email.cz.
<pre style=''><span style='line-height: normal; white-space: pre-wrap;'>Thanks for your response, I now understand why the error is occurring and was able to resolve the issue. I will just describe the solution for others who may run into the same issue. Simply put I need to commit offset during group rebalance.</span>

<span style='line-height: normal; white-space: pre-wrap;'>I tried SimpleConsumer (described in my original email) and also the new KafkaConsumer which results in "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>"</span>

<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>That error makes sense, because I was trying to commit with incorrect memberId. Not because of processing of records between poll() calls was taking too long as suggested in the error, but because the consumer failed, but the result was similar.</span>

<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>I eventually tried using the KafkaConsumer instance that uses the particular instance of </span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>ConsumerRebalanceListener that triggered onPartitionsAssigned() which seems to be the member that received partition assignment during rebalance and therefore I am now able to</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'> ensure the next poll from the group starts from the newly committed offset.</span>

<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>Thanks,</span><br>

<font face='TriviaSeznam, arial, sans-serif'><span style='line-height: normal; white-space: pre-wrap;'>Martin</span></font>

<font face='TriviaSeznam, arial, sans-serif'><span style='line-height: normal; white-space: pre-wrap;'><br></span></font>

<span style='line-height: normal; white-space: pre-wrap;'>> From: mgainty@hotmail.com
> To: users@kafka.apache.org
> Subject: RE: Consumer failure and producer topic deduplication
</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>></span><span style='line-height: normal; white-space: pre-wrap;'> Date: Sun, 21 Aug 2016 19:22:13 GMT</span>


<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>here is a scala test which produces UNKNOWN_MEMBER_ID
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span> @Test  def testJoinGroupUnknownConsumerNewGroup() {    val joinGroupResult = joinGroup(groupId,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>memberId, DefaultSessionTimeout, protocolType, protocols)    val joinGroupErrorCode = joinGroupResult.errorCode
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>   assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode)  }
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span> an attribute of supplied group trait (trait is like java class) is either null or invalid..log
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>the groupId, memberId and verify e.g.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>log.info("Logging GroupId %d MemberId %d".format(groupId, memberId))

<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>Martin

>> From: zapletal-martin@email.cz
>> To: users@kafka.apache.org
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Subject: Consumer failure and producer topic deduplication
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Date: Sun, 21 Aug 2016 14:44:43 +0200
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Hi,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> I am using Kafka for a fairly specific use case with a need for a lot of 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> control. I use the new KafkaConsumer to consume events from input topic, 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> process the data and publish to multiple different topics. The use case 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> requires fairly strict delivery and consistency guarantees. All messages 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> must be published to the resulting topics "exactly once". I am using 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> message is published, but the consumer dies before committing the offset 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> which would normally result in a at least once delivery guarantee and 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> therefore possibly duplicated messages, because after a rebalance new 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> consumer in the group would publish the same uncommited offset again.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> perform correcting actions during group rebalance after a failure.  I am 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> using a SimpleConsumer (for granular control and ability to read/commit 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> using the original consumer group) to find these issues and publish the 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> partially processed records as necessary and most importantly commit the 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> offset to avoid duplicates if required. The result of 'simpleConsumer.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID 
> error.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Can anyone please explain why I might be getting this error and how to avoid
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> it in my scenario?
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Thank you,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Marti<span style='font-family: TriviaSeznam, arial, sans-serif;'>n</span>