You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Takeshi Yamasaki (Jira)" <ji...@apache.org> on 2021/07/05 10:12:00 UTC
[jira] [Updated] (KAFKA-9619) Receiving duplicates when application
is configured for exactly once
[ https://issues.apache.org/jira/browse/KAFKA-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Takeshi Yamasaki updated KAFKA-9619:
------------------------------------
Description:
こんにちは。すべてが高耐久性のために構成されていて、1回だけの構成を使用している場合でも、重複を受け取る場合があります(非常にまれですが、あります)。
この問題の原因となるアプリケーションコンテキストとテストシナリオを以下で確認してください。
h2. Kafkaクラスターのセットアップ
3 x Kafkaブローカー(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
3 x Zookeeperインスタンス(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
h3. Kafka構成
ブローカーID = 1,2,3
num.network.threads = 2
num.io.threads = 8
socket.send.buffer.bytes = 102400
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
log.dirs = / home / kafka / logs /
kafka min.insync.replicas = 3
transaction.state.log.min.isr = 3
default.replication.factor = 3
log.retention.minutes = 600
log.segment.bytes = 1073741824
log.retention.check.interval.ms = 300000 zookeeper.connect =
host1:2181、host2:2181、host3:2181 zookeeper.connection.timeout.ms =
6000
group.initial.rebalance.delay.ms = 1000
log.message .timestamp.type = LogAppendTime
delete.topic.enable = true
auto.create.topics.enable = false
unclean.leader.election.enable = false
h3. ZooKeeper configuration
tickTime=2000
dataDir=/home/kafka/logs/zk
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
h2. Kafka internal topics description
Topic:__transaction_state PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event
Topic:input-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
h3. Topic output-event
Topic:output-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: output-event Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: output-event Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: output-event Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
h2. Application consumer properties
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [host1:9092, host2:9092, host3:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 134217728
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupId
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 134217728
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 1000
request.timeout.ms = 30000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties
bootstrapServers = "host1, host2, host3"
transactionIdPrefix = "my-producer-"${instance}"
"enable.idempotence" = "true"
"acks" = "all"
"retries" = "2147483647"
"transaction.timeout.ms" = "10000"
"max.in.flight.requests.per.connection" = "1"
"reconnect.backoff.max.ms" = "1000"
"reconnect.backoff.ms" = "1000"
"retry.backoff.ms" = "1000"
h2. Application handling commits
Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description
-Write 32,000 messages to input topic
-Start 3 application instances
-Start process the messages one by one (max.poll.records = 1)
-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
-Wait 60 seconds
-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
-Wait 60 seconds
-処理中に、*SIGKILL*(kill -9)を*host2*および*host3 KafkaBrokers*に50回並行して送信します。
出力トピックへのメッセージは32,000であると予想されていましたが、実際には重複(少なくとも1つ)になる場合があります。
32,000通のメッセージが表示され、すべてが正しい場合があります。
問題が発生するたびに、イベントの順序は同じです。
{{グループが}}コミットの直前に{{リバランスしているため、}}この{{ハートビートの試行は失敗しました}}。
添付のアプリケーションログファイル。
was:
Hi. There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration.
Please check below the application context and test scenario that causes this issue.
h2. Kafka Cluster Setup
3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)
3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
h3. Kafka configuration
broker.id=1,2,3
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka/logs/kafka
min.insync.replicas=3
transaction.state.log.min.isr=3
default.replication.factor=3
log.retention.minutes=600
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=host1:2181,host2:2181,host3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=1000
log.message.timestamp.type=LogAppendTime
delete.topic.enable=true
auto.create.topics.enable=false
unclean.leader.election.enable=false
h3. ZooKeeper configuration
tickTime=2000
dataDir=/home/kafka/logs/zk
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
h2. Kafka internal topics description
Topic:__transaction_state PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event
Topic:input-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
h3. Topic output-event
Topic:output-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
Topic: output-event Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
Topic: output-event Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
Topic: output-event Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
h2. Application consumer properties
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [host1:9092, host2:9092, host3:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 134217728
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupId
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_committed
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 134217728
max.poll.interval.ms = 300000
max.poll.records = 1
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 1000
request.timeout.ms = 30000
retry.backoff.ms = 1000
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties
bootstrapServers = "host1, host2, host3"
transactionIdPrefix = "my-producer-"${instance}"
"enable.idempotence" = "true"
"acks" = "all"
"retries" = "2147483647"
"transaction.timeout.ms" = "10000"
"max.in.flight.requests.per.connection" = "1"
"reconnect.backoff.max.ms" = "1000"
"reconnect.backoff.ms" = "1000"
"retry.backoff.ms" = "1000"
h2. Application handling commits
Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description
-Write 32,000 messages to input topic
-Start 3 application instances
-Start process the messages one by one (max.poll.records = 1)
-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
-Wait 60 seconds
-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
-Wait 60 seconds
-During processing, send *SIGKILL* (kill -9) in parallel to *host2* and *host3* Kafka Brokers for 50 times.
Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one).
There are times when we end up with 32,000 messages and everything is right.
Every time the issue occurs, the sequence of events is the same.
Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right before a commit.
Attached application log file.
> Receiving duplicates when application is configured for exactly once
> --------------------------------------------------------------------
>
> Key: KAFKA-9619
> URL: https://issues.apache.org/jira/browse/KAFKA-9619
> Project: Kafka
> Issue Type: Bug
> Components: consumer, producer
> Affects Versions: 2.1.1, 2.4.0
> Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
> Reporter: Cristian Manoliu
> Priority: Major
> Labels: Kafka
> Attachments: log
>
>
> こんにちは。すべてが高耐久性のために構成されていて、1回だけの構成を使用している場合でも、重複を受け取る場合があります(非常にまれですが、あります)。
>
> この問題の原因となるアプリケーションコンテキストとテストシナリオを以下で確認してください。
> h2. Kafkaクラスターのセットアップ
> 3 x Kafkaブローカー(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
> 3 x Zookeeperインスタンス(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
> h3. Kafka構成
> ブローカーID = 1,2,3
> num.network.threads = 2
> num.io.threads = 8
> socket.send.buffer.bytes = 102400
> socket.receive.buffer.bytes = 102400
> socket.request.max.bytes = 104857600
> log.dirs = / home / kafka / logs /
> kafka min.insync.replicas = 3
> transaction.state.log.min.isr = 3
> default.replication.factor = 3
> log.retention.minutes = 600
> log.segment.bytes = 1073741824
> log.retention.check.interval.ms = 300000 zookeeper.connect =
> host1:2181、host2:2181、host3:2181 zookeeper.connection.timeout.ms =
> 6000
> group.initial.rebalance.delay.ms = 1000
> log.message .timestamp.type = LogAppendTime
> delete.topic.enable = true
> auto.create.topics.enable = false
> unclean.leader.election.enable = false
> h3. ZooKeeper configuration
> tickTime=2000
> dataDir=/home/kafka/logs/zk
> clientPort=2181
> maxClientCnxns=0
> initLimit=5
> syncLimit=2
> server.1=host1:2888:3888
> server.2=host2:2888:3888
> server.3=host3:2888:3888
> autopurge.snapRetainCount=3
> autopurge.purgeInterval=24
> h2. Kafka internal topics description
> Topic:__transaction_state PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
> Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
>
> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
> Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3
> h2. Application topics
> h3. Topic input-event
> Topic:input-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
> Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
> Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
> Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
> h3. Topic output-event
> Topic:output-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
> Topic: output-event Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3
> Topic: output-event Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3
> Topic: output-event Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
> h2. Application consumer properties
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [host1:9092, host2:9092, host3:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 134217728
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = groupId
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> isolation.level = read_committed
> key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 134217728
> max.poll.interval.ms = 300000
> max.poll.records = 1
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 1000
> request.timeout.ms = 30000
> retry.backoff.ms = 1000
> sasl.client.callback.handler.class = null
> sasl.jaas.config = null
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = null
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = https
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> h2. Application producer properties
> bootstrapServers = "host1, host2, host3"
> transactionIdPrefix = "my-producer-"${instance}"
> "enable.idempotence" = "true"
> "acks" = "all"
> "retries" = "2147483647"
> "transaction.timeout.ms" = "10000"
> "max.in.flight.requests.per.connection" = "1"
> "reconnect.backoff.max.ms" = "1000"
> "reconnect.backoff.ms" = "1000"
> "retry.backoff.ms" = "1000"
> h2. Application handling commits
> Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
> h2. Test expected/actual
> h3. Test description
> -Write 32,000 messages to input topic
> -Start 3 application instances
> -Start process the messages one by one (max.poll.records = 1)
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -処理中に、*SIGKILL*(kill -9)を*host2*および*host3 KafkaBrokers*に50回並行して送信します。
>
> 出力トピックへのメッセージは32,000であると予想されていましたが、実際には重複(少なくとも1つ)になる場合があります。
> 32,000通のメッセージが表示され、すべてが正しい場合があります。
> 問題が発生するたびに、イベントの順序は同じです。
> {{グループが}}コミットの直前に{{リバランスしているため、}}この{{ハートビートの試行は失敗しました}}。
> 添付のアプリケーションログファイル。
--
This message was sent by Atlassian Jira
(v8.3.4#803005)