You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by za...@email.cz on 2016/08/21 12:44:43 UTC
Consumer failure and producer topic deduplication
Hi,
I am using Kafka for a fairly specific use case with a need for a lot of
control. I use the new KafkaConsumer to consume events from input topic,
process the data and publish to multiple different topics. The use case
requires fairly strict delivery and consistency guarantees. All messages
must be published to the resulting topics "exactly once". I am using
ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the
message is published, but the consumer dies before committing the offset
which would normally result in a at least once delivery guarantee and
therefore possibly duplicated messages, because after a rebalance new
consumer in the group would publish the same uncommited offset again.
Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to
perform correcting actions during group rebalance after a failure. I am
using a SimpleConsumer (for granular control and ability to read/commit
using the original consumer group) to find these issues and publish the
partially processed records as necessary and most importantly commit the
offset to avoid duplicates if required. The result of 'simpleConsumer.
commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is
'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID
error.
Can anyone please explain why I might be getting this error and how to avoid
it in my scenario?
Thank you,
Martin
RE: Consumer failure and producer topic deduplication
Posted by Martin Gainty <mg...@hotmail.com>.
here is a scala test which produces UNKNOWN_MEMBER_ID
@Test def testJoinGroupUnknownConsumerNewGroup() { val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols) val joinGroupErrorCode = joinGroupResult.errorCode assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode) }
an attribute of supplied group trait (trait is like java class) is either null or invalid..log the groupId, memberId and verify e.g.
log.info("Logging GroupId %d MemberId %d".format(groupId, memberId))
Martin
> From: zapletal-martin@email.cz
> To: users@kafka.apache.org
> Subject: Consumer failure and producer topic deduplication
> Date: Sun, 21 Aug 2016 14:44:43 +0200
>
>
> Hi,
>
>
>
>
> I am using Kafka for a fairly specific use case with a need for a lot of
> control. I use the new KafkaConsumer to consume events from input topic,
> process the data and publish to multiple different topics. The use case
> requires fairly strict delivery and consistency guarantees. All messages
> must be published to the resulting topics "exactly once". I am using
> ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the
> message is published, but the consumer dies before committing the offset
> which would normally result in a at least once delivery guarantee and
> therefore possibly duplicated messages, because after a rebalance new
> consumer in the group would publish the same uncommited offset again.
>
>
>
>
> Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to
> perform correcting actions during group rebalance after a failure. I am
> using a SimpleConsumer (for granular control and ability to read/commit
> using the original consumer group) to find these issues and publish the
> partially processed records as necessary and most importantly commit the
> offset to avoid duplicates if required. The result of 'simpleConsumer.
> commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is
> 'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID
> error.
>
>
>
>
> Can anyone please explain why I might be getting this error and how to avoid
> it in my scenario?
>
>
>
>
> Thank you,
>
> Martin
Re: Consumer failure and producer topic deduplication
Posted by za...@email.cz.
<pre style=''><span style='line-height: normal; white-space: pre-wrap;'>Thanks for your response, I now understand why the error is occurring and was able to resolve the issue. I will just describe the solution for others who may run into the same issue. Simply put I need to commit offset during group rebalance.</span>
<span style='line-height: normal; white-space: pre-wrap;'>I tried SimpleConsumer (described in my original email) and also the new KafkaConsumer which results in "org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>"</span>
<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>That error makes sense, because I was trying to commit with incorrect memberId. Not because of processing of records between poll() calls was taking too long as suggested in the error, but because the consumer failed, but the result was similar.</span>
<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>I eventually tried using the KafkaConsumer instance that uses the particular instance of </span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>ConsumerRebalanceListener that triggered onPartitionsAssigned() which seems to be the member that received partition assignment during rebalance and therefore I am now able to</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'> ensure the next poll from the group starts from the newly committed offset.</span>
<span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>Thanks,</span><br>
<font face='TriviaSeznam, arial, sans-serif'><span style='line-height: normal; white-space: pre-wrap;'>Martin</span></font>
<font face='TriviaSeznam, arial, sans-serif'><span style='line-height: normal; white-space: pre-wrap;'><br></span></font>
<span style='line-height: normal; white-space: pre-wrap;'>> From: mgainty@hotmail.com
> To: users@kafka.apache.org
> Subject: RE: Consumer failure and producer topic deduplication
</span><span style='line-height: normal; white-space: pre-wrap; font-family: TriviaSeznam, arial, sans-serif;'>></span><span style='line-height: normal; white-space: pre-wrap;'> Date: Sun, 21 Aug 2016 19:22:13 GMT</span>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>here is a scala test which produces UNKNOWN_MEMBER_ID
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span> @Test def testJoinGroupUnknownConsumerNewGroup() { val joinGroupResult = joinGroup(groupId,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>memberId, DefaultSessionTimeout, protocolType, protocols) val joinGroupErrorCode = joinGroupResult.errorCode
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span> assertEquals(Errors.UNKNOWN_MEMBER_ID.code, joinGroupErrorCode) }
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span> an attribute of supplied group trait (trait is like java class) is either null or invalid..log
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>the groupId, memberId and verify e.g.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>log.info("Logging GroupId %d MemberId %d".format(groupId, memberId))
<span style='font-family: TriviaSeznam, arial, sans-serif;'>> </span>Martin
>> From: zapletal-martin@email.cz
>> To: users@kafka.apache.org
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Subject: Consumer failure and producer topic deduplication
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Date: Sun, 21 Aug 2016 14:44:43 +0200
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Hi,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> I am using Kafka for a fairly specific use case with a need for a lot of
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> control. I use the new KafkaConsumer to consume events from input topic,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> process the data and publish to multiple different topics. The use case
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> requires fairly strict delivery and consistency guarantees. All messages
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> must be published to the resulting topics "exactly once". I am using
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> ExactlyOnceDeliveryConsumerRebalanceListener to handle cases where the
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> message is published, but the consumer dies before committing the offset
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> which would normally result in a at least once delivery guarantee and
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> therefore possibly duplicated messages, because after a rebalance new
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> consumer in the group would publish the same uncommited offset again.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Therefore I am using an ExactlyOnceDeliveryConsumerRebalanceListener to
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> perform correcting actions during group rebalance after a failure. I am
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> using a SimpleConsumer (for granular control and ability to read/commit
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> using the original consumer group) to find these issues and publish the
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> partially processed records as necessary and most importantly commit the
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> offset to avoid duplicates if required. The result of 'simpleConsumer.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> commitOffsets(new OffsetCommitRequest(group, topicPartition))' however is
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> 'OffsetCommitResponse(Map([topic,0] -> 25),0)', i.e. UNKNOWN_MEMBER_ID
> error.
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Can anyone please explain why I might be getting this error and how to avoid
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> it in my scenario?
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Thank you,
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>>
<span style='font-family: TriviaSeznam, arial, sans-serif;'>></span>> Marti<span style='font-family: TriviaSeznam, arial, sans-serif;'>n</span>