You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mikael (Jira)" <ji...@apache.org> on 2022/11/29 13:23:00 UTC

[jira] [Updated] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

     [ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mikael updated KAFKA-14419:
---------------------------
    Description: 
Trigger scenario:

Four Kafka client application instances on separate EC2 instances with a total of 8 active and 8 standby stream tasks for the same stream topology, consuming from an input topic with 8 partitions. Sometimes a handful of messages are consumed twice by one of the stream tasks when stream tasks on another application instance join the consumer group after an application instance restart.

Additional information:

Messages are produced to the topic by another Kafka streams topology deployed on the same four application instances. I have verified that each message is only produced once by enabling debug logging in the topology flow right before producing each message to the topic.

Logs from stream thread with duplicate consumption:

 
{code:java}
2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is already rebalancing
2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group

Input records consumed for the first time

2022-11-21 15:09:33,919 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:33,920 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to missed rebalance.
        lost active tasks: [0_1]
        lost assigned standby tasks: []
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended RUNNING
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended running
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closing record collector dirty
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closed dirty
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] partitions lost took 19 ms.
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
2022-11-21 15:09:35,391 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:35,395 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with
        Assigned partitions:                       [messages.xms.mt.batch.enqueue.sms-1]
        Current owned partitions:                  []
        Added partitions (assigned - owned):       [messages.xms.mt.batch.enqueue.sms-1]
        Revoked partitions (owned - assigned):     []
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52)
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Handle new assignment with:
        New active tasks: [0_1]
        New standby tasks: []
        Existing active tasks: []
        Existing standby tasks: []
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
2022-11-21 15:09:35,398 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-1 to the committed offset FetchPosition{offset=26744389, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 1 rack: use1-az6)], epoch=19}}
2022-11-21 15:09:35,444 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Initialized
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Restored and ready to run
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Restoration took 49 ms for all tasks [0_1]
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
22022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State transition from REBALANCING to RUNNING
2022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for messages.xms.mt.batch.enqueue.sms-1 in order to compute lag

Same input records consumed for the second time{code}
Streams consumer configuration:
{noformat}
        allow.auto.create.topics = false
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = messages.xms.mms.mt-05bfc9d3-7f4b-48d4-9c8c-cf9d3e496fef-StreamThread-1-consumer
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = messages.xms.mms.mt
        group.instance.id = null
        heartbeat.interval.ms = 1500
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_committed
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 1000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 6000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer{noformat}
 

The message about lost partition that is highlighted in red above only occurs when messages are consumed twice, which happens roughly two times out of ten in my application restart test scenario.

This issue no longer occurs when the patch suggested in KAFKA-14362 is applied.

  was:
Trigger scenario:

Four Kafka client application instances on separate EC2 instances with a total of 8 active and 8 standby stream tasks for the same stream topology, consuming from an input topic with 8 partitions. Sometimes a handful of messages are consumed twice by one of the stream tasks when stream tasks on another application instance join the consumer group after an application instance restart.

Additional information:

Messages are produced to the topic by another Kafka streams topology deployed on the same four application instances. I have verified that each message is only produced once by enabling debug logging in the topology flow right before producing each message to the topic.

Logs from stream thread with duplicate consumption:

 
{code:java}
2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is already rebalancing
2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group

Input records consumed for the first time

2022-11-21 15:09:33,919 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:33,920 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to missed rebalance.
        lost active tasks: [0_1]
        lost assigned standby tasks: []
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended RUNNING
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended running
2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closing record collector dirty
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closed dirty
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] partitions lost took 19 ms.
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)
2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
2022-11-21 15:09:35,391 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:35,395 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with
        Assigned partitions:                       [messages.xms.mt.batch.enqueue.sms-1]
        Current owned partitions:                  []
        Added partitions (assigned - owned):       [messages.xms.mt.batch.enqueue.sms-1]
        Revoked partitions (owned - assigned):     []
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52)
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Handle new assignment with:
        New active tasks: [0_1]
        New standby tasks: []
        Existing active tasks: []
        Existing standby tasks: []
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
2022-11-21 15:09:35,398 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-1 to the committed offset FetchPosition{offset=26744389, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 1 rack: use1-az6)], epoch=19}}
2022-11-21 15:09:35,444 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Initialized
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Restored and ready to run
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Restoration took 49 ms for all tasks [0_1]
2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
22022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State transition from REBALANCING to RUNNING
2022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for messages.xms.mt.batch.enqueue.sms-1 in order to compute lag

Same input records consumed for the second time{code}
 

The message about lost partition that is highlighted in red above only occurs when messages are consumed twice, which happens roughly two times out of ten in my application restart test scenario.

This issue no longer occurs when the patch suggested in KAFKA-14362 is applied.


> Same message consumed again by the same stream task after partition is lost and reassigned
> ------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14419
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14419
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.3.1
>         Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>            Reporter: Mikael
>            Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a total of 8 active and 8 standby stream tasks for the same stream topology, consuming from an input topic with 8 partitions. Sometimes a handful of messages are consumed twice by one of the stream tasks when stream tasks on another application instance join the consumer group after an application instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed on the same four application instances. I have verified that each message is only produced once by enabling debug logging in the topology flow right before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is already rebalancing
> 2022-11-21 15:09:33,677 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
> 2022-11-21 15:09:33,920 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=8017, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
> 2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group
> 2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions messages.xms.mt.batch.enqueue.sms-1
> 2022-11-21 15:09:33,923 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to missed rebalance.
>         lost active tasks: [0_1]
>         lost assigned standby tasks: []
> 2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended RUNNING
> 2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Suspended running
> 2022-11-21 15:09:33,941 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closing record collector dirty
> 2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Closed dirty
> 2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] partitions lost took 19 ms.
> 2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)
> 2022-11-21 15:09:33,942 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> 2022-11-21 15:09:35,391 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
> 2022-11-21 15:09:35,395 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8018, memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', protocol='stream'}
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with
>         Assigned partitions:                       [messages.xms.mt.batch.enqueue.sms-1]
>         Current owned partitions:                  []
>         Added partitions (assigned - owned):       [messages.xms.mt.batch.enqueue.sms-1]
>         Revoked partitions (owned - assigned):     []
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52)
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule.
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Handle new assignment with:
>         New active tasks: [0_1]
>         New standby tasks: []
>         Existing active tasks: []
>         Existing standby tasks: []
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-1
> 2022-11-21 15:09:35,396 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED
> 2022-11-21 15:09:35,398 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-1 to the committed offset FetchPosition{offset=26744389, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 1 rack: use1-az6)], epoch=19}}
> 2022-11-21 15:09:35,444 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Initialized
> 2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] task [0_1] Restored and ready to run
> 2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] Restoration took 49 ms for all tasks [0_1]
> 2022-11-21 15:09:35,445 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
> 22022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State transition from REBALANCING to RUNNING
> 2022-11-21 15:09:35,446 INFO [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for messages.xms.mt.batch.enqueue.sms-1 in order to compute lag
> Same input records consumed for the second time{code}
> Streams consumer configuration:
> {noformat}
>         allow.auto.create.topics = false
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = messages.xms.mms.mt-05bfc9d3-7f4b-48d4-9c8c-cf9d3e496fef-StreamThread-1-consumer
>         client.rack = 
>         connections.max.idle.ms = 540000
>         default.api.timeout.ms = 60000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = messages.xms.mms.mt
>         group.instance.id = null
>         heartbeat.interval.ms = 1500
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_committed
>         key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 300000
>         max.poll.records = 1000
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 30000
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 10000
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = SSL
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 6000
>         socket.connection.setup.timeout.max.ms = 30000
>         socket.connection.setup.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         ssl.keystore.key = null
>         ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks
>         ssl.keystore.password = [hidden]
>         ssl.keystore.type = JKS
>         ssl.protocol = TLSv1.3
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.certificates = null
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer{noformat}
>  
> The message about lost partition that is highlighted in red above only occurs when messages are consumed twice, which happens roughly two times out of ten in my application restart test scenario.
> This issue no longer occurs when the patch suggested in KAFKA-14362 is applied.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)