You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Cristian Manoliu (Jira)" <ji...@apache.org> on 2020/03/02 15:50:00 UTC
[jira] [Commented] (KAFKA-9619) Receiving duplicates when
application is configured for exactly once
[ https://issues.apache.org/jira/browse/KAFKA-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049338#comment-17049338 ]
Cristian Manoliu commented on KAFKA-9619:
-----------------------------------------
Apologies. This was an application issue due the fact that transaction id was not correctly, not set at topic.partition level.
> Receiving duplicates when application is configured for exactly once
> --------------------------------------------------------------------
>
> Key: KAFKA-9619
> URL: https://issues.apache.org/jira/browse/KAFKA-9619
> Project: Kafka
> Issue Type: Bug
> Components: consumer, producer
> Affects Versions: 2.1.1, 2.4.0
> Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
> Reporter: Cristian Manoliu
> Priority: Major
> Labels: Kafka
> Attachments: log
>
>
> Hi. There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration.
>
> Please check below the application context and test scenario that causes this issue.
> h2. Kafka Cluster Setup
> 3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)
> 3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
> h3. Kafka configuration
> broker.id=1,2,3
> num.network.threads=2
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/home/kafka/logs/kafka
> min.insync.replicas=3
> transaction.state.log.min.isr=3
> default.replication.factor=3
> log.retention.minutes=600
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> zookeeper.connect=host1:2181,host2:2181,host3:2181
> zookeeper.connection.timeout.ms=6000
> group.initial.rebalance.delay.ms=1000
> log.message.timestamp.type=LogAppendTime
> delete.topic.enable=true
> auto.create.topics.enable=false
> unclean.leader.election.enable=false
> h3. ZooKeeper configuration
> tickTime=2000
> dataDir=/home/kafka/logs/zk
> clientPort=2181
> maxClientCnxns=0
> initLimit=5
> syncLimit=2
> server.1=host1:2888:3888
> server.2=host2:2888:3888
> server.3=host3:2888:3888
> autopurge.snapRetainCount=3
> autopurge.purgeInterval=24
> h2. Kafka internal topics description
> Topic:__transaction_state PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
> Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
>
> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
> h2. Application topics
> h3. Topic input-event
> Topic:input-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
> Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
> Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
> Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
> h3. Topic output-event
> Topic:output-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
> Topic: output-event Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
> Topic: output-event Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
> Topic: output-event Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
> h2. Application consumer properties
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [host1:9092, host2:9092, host3:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 134217728
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = groupId
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> isolation.level = read_committed
> key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 134217728
> max.poll.interval.ms = 300000
> max.poll.records = 1
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 1000
> request.timeout.ms = 30000
> retry.backoff.ms = 1000
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = https
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> h2. Application producer properties
> bootstrapServers = "host1, host2, host3"
> transactionIdPrefix = "my-producer-"${instance}"
> "enable.idempotence" = "true"
> "acks" = "all"
> "retries" = "2147483647"
> "transaction.timeout.ms" = "10000"
> "max.in.flight.requests.per.connection" = "1"
> "reconnect.backoff.max.ms" = "1000"
> "reconnect.backoff.ms" = "1000"
> "retry.backoff.ms" = "1000"
> h2. Application handling commits
> Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
> h2. Test expected/actual
> h3. Test description
> -Write 32,000 messages to input topic
> -Start 3 application instances
> -Start process the messages one by one (max.poll.records = 1)
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host2* and *host3* Kafka Brokers for 50 times.
>
> Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one).
> There are times when we end up with 32,000 messages and everything is right.
> Every time the issue occurs, the sequence of events is the same.
> Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right before a commit.
> Attached application log file.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)