You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Cristian Manoliu (Jira)" <ji...@apache.org> on 2020/03/02 15:50:01 UTC

[jira] [Resolved] (KAFKA-9619) Receiving duplicates when application is configured for exactly once

     [ https://issues.apache.org/jira/browse/KAFKA-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cristian Manoliu resolved KAFKA-9619.
-------------------------------------
    Resolution: Invalid

> Receiving duplicates when application is configured for exactly once
> --------------------------------------------------------------------
>
>                 Key: KAFKA-9619
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9619
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 2.1.1, 2.4.0
>         Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
>            Reporter: Cristian Manoliu
>            Priority: Major
>              Labels: Kafka
>         Attachments: log
>
>
> Hi. There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration.
>  
> Please check below the application context and test scenario that causes this issue.
> h2. Kafka Cluster Setup
> 3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)
> 3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
> h3. Kafka configuration
> broker.id=1,2,3
>  num.network.threads=2
>  num.io.threads=8
>  socket.send.buffer.bytes=102400
>  socket.receive.buffer.bytes=102400
>  socket.request.max.bytes=104857600
>  log.dirs=/home/kafka/logs/kafka
>  min.insync.replicas=3
>  transaction.state.log.min.isr=3
>  default.replication.factor=3
>  log.retention.minutes=600
>  log.segment.bytes=1073741824
>  log.retention.check.interval.ms=300000
>  zookeeper.connect=host1:2181,host2:2181,host3:2181
>  zookeeper.connection.timeout.ms=6000
>  group.initial.rebalance.delay.ms=1000
>  log.message.timestamp.type=LogAppendTime
>  delete.topic.enable=true
>  auto.create.topics.enable=false
>  unclean.leader.election.enable=false
> h3. ZooKeeper configuration
> tickTime=2000
>  dataDir=/home/kafka/logs/zk
>  clientPort=2181
>  maxClientCnxns=0
>  initLimit=5
>  syncLimit=2
>  server.1=host1:2888:3888
>  server.2=host2:2888:3888
>  server.3=host3:2888:3888
>  autopurge.snapRetainCount=3
>  autopurge.purgeInterval=24
> h2. Kafka internal topics description
> Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
>        Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
>  ​
>  Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
>        Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
> h2. Application topics
> h3. Topic input-event
> Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
>        Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
>        Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
>        Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
> h3. Topic output-event
> Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
>        Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
>        Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
>        Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
> h2. Application consumer properties
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: 
>                auto.commit.interval.ms = 5000
>                auto.offset.reset = earliest
>                bootstrap.servers = [host1:9092, host2:9092, host3:9092]
>                check.crcs = true
>                client.id = 
>                connections.max.idle.ms = 540000
>                default.api.timeout.ms = 60000
>                enable.auto.commit = false
>                exclude.internal.topics = true
>                fetch.max.bytes = 134217728
>                fetch.max.wait.ms = 500
>                fetch.min.bytes = 1
>                group.id = groupId
>                heartbeat.interval.ms = 3000
>                interceptor.classes = []
>                internal.leave.group.on.close = true
>                isolation.level = read_committed
>                key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>                max.partition.fetch.bytes = 134217728
>                max.poll.interval.ms = 300000
>                max.poll.records = 1
>                metadata.max.age.ms = 300000
>                metric.reporters = []
>                metrics.num.samples = 2
>                metrics.recording.level = INFO
>                metrics.sample.window.ms = 30000
>                partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
>                receive.buffer.bytes = 65536
>                reconnect.backoff.max.ms = 1000
>                reconnect.backoff.ms = 1000
>                request.timeout.ms = 30000
>                retry.backoff.ms = 1000
>                sasl.client.callback.handler.class = null
>                sasl.jaas.config = null
>                sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                sasl.kerberos.min.time.before.relogin = 60000
>                sasl.kerberos.service.name = null
>                sasl.kerberos.ticket.renew.jitter = 0.05
>                sasl.kerberos.ticket.renew.window.factor = 0.8
>                sasl.login.callback.handler.class = null
>                sasl.login.class = null
>                sasl.login.refresh.buffer.seconds = 300
>                sasl.login.refresh.min.period.seconds = 60
>                sasl.login.refresh.window.factor = 0.8
>                sasl.login.refresh.window.jitter = 0.05
>                sasl.mechanism = GSSAPI
>                security.protocol = PLAINTEXT
>                send.buffer.bytes = 131072
>                session.timeout.ms = 10000
>                ssl.cipher.suites = null
>                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                ssl.endpoint.identification.algorithm = https
>                ssl.key.password = null
>                ssl.keymanager.algorithm = SunX509
>                ssl.keystore.location = null
>                ssl.keystore.password = null
>                ssl.keystore.type = JKS
>                ssl.protocol = TLS
>                ssl.provider = null
>                ssl.secure.random.implementation = null
>                ssl.trustmanager.algorithm = PKIX
>                ssl.truststore.location = null
>                ssl.truststore.password = null
>                ssl.truststore.type = JKS
>                value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> h2. Application producer properties
> bootstrapServers = "host1, host2, host3"
>  transactionIdPrefix = "my-producer-"${instance}"
>  "enable.idempotence" = "true"
>  "acks" = "all"
>  "retries" = "2147483647"
>  "transaction.timeout.ms" = "10000"
>  "max.in.flight.requests.per.connection" = "1"
>  "reconnect.backoff.max.ms" = "1000"
>  "reconnect.backoff.ms" = "1000"
>  "retry.backoff.ms" = "1000"
> h2. Application handling commits
> Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
> h2. Test expected/actual
> h3. Test description
> -Write 32,000 messages to input topic
> -Start 3 application instances
> -Start process the messages one by one (max.poll.records = 1)
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host2* and *host3* Kafka Brokers for 50 times.
>  
> Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one).
> There are times when we end up with 32,000 messages and everything is right.
> Every time the issue occurs, the sequence of events is the same.
> Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right before a commit.
> Attached application log file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)