You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Takeshi Yamasaki (Jira)" <ji...@apache.org> on 2021/07/05 10:12:00 UTC

[jira] [Updated] (KAFKA-9619) Receiving duplicates when application is configured for exactly once

     [ https://issues.apache.org/jira/browse/KAFKA-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Takeshi Yamasaki updated KAFKA-9619:
------------------------------------
    Description: 
こんにちは。すべてが高耐久性のために構成されていて、1回だけの構成を使用している場合でも、重複を受け取る場合があります(非常にまれですが、あります)。

 

この問題の原因となるアプリケーションコンテキストとテストシナリオを以下で確認してください。
h2. Kafkaクラスターのセットアップ

3 x Kafkaブローカー(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)

3 x Zookeeperインスタンス(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
h3. Kafka構成

ブローカーID = 1,2,3 
num.network.threads = 2 
num.io.threads = 8 
socket.send.buffer.bytes = 102400 
socket.receive.buffer.bytes = 102400 
socket.request.max.bytes = 104857600 
log.dirs = / home / kafka / logs / 
kafka min.insync.replicas = 3 
transaction.state.log.min.isr = 3 
default.replication.factor = 3 
log.retention.minutes = 600 
log.segment.bytes = 1073741824 
log.retention.check.interval.ms = 300000 zookeeper.connect = 
host1:2181、host2:2181、host3:2181 zookeeper.connection.timeout.ms = 
6000 
group.initial.rebalance.delay.ms = 1000 
log.message .timestamp.type = LogAppendTime 
delete.topic.enable = true 
auto.create.topics.enable = false
 unclean.leader.election.enable = false
h3. ZooKeeper configuration

tickTime=2000
 dataDir=/home/kafka/logs/zk
 clientPort=2181
 maxClientCnxns=0
 initLimit=5
 syncLimit=2
 server.1=host1:2888:3888
 server.2=host2:2888:3888
 server.3=host3:2888:3888
 autopurge.snapRetainCount=3
 autopurge.purgeInterval=24
h2. Kafka internal topics description

Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
       Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
 ​
 Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
       Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event

Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
       Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
       Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
       Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
h3. Topic output-event

Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
       Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
       Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
       Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
h2. Application consumer properties

o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: 
               auto.commit.interval.ms = 5000
               auto.offset.reset = earliest
               bootstrap.servers = [host1:9092, host2:9092, host3:9092]
               check.crcs = true
               client.id = 
               connections.max.idle.ms = 540000
               default.api.timeout.ms = 60000
               enable.auto.commit = false
               exclude.internal.topics = true
               fetch.max.bytes = 134217728
               fetch.max.wait.ms = 500
               fetch.min.bytes = 1
               group.id = groupId
               heartbeat.interval.ms = 3000
               interceptor.classes = []
               internal.leave.group.on.close = true
               isolation.level = read_committed
               key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
               max.partition.fetch.bytes = 134217728
               max.poll.interval.ms = 300000
               max.poll.records = 1
               metadata.max.age.ms = 300000
               metric.reporters = []
               metrics.num.samples = 2
               metrics.recording.level = INFO
               metrics.sample.window.ms = 30000
               partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
               receive.buffer.bytes = 65536
               reconnect.backoff.max.ms = 1000
               reconnect.backoff.ms = 1000
               request.timeout.ms = 30000
               retry.backoff.ms = 1000
               sasl.client.callback.handler.class = null
               sasl.jaas.config = null
               sasl.kerberos.kinit.cmd = /usr/bin/kinit
               sasl.kerberos.min.time.before.relogin = 60000
               sasl.kerberos.service.name = null
               sasl.kerberos.ticket.renew.jitter = 0.05
               sasl.kerberos.ticket.renew.window.factor = 0.8
               sasl.login.callback.handler.class = null
               sasl.login.class = null
               sasl.login.refresh.buffer.seconds = 300
               sasl.login.refresh.min.period.seconds = 60
               sasl.login.refresh.window.factor = 0.8
               sasl.login.refresh.window.jitter = 0.05
               sasl.mechanism = GSSAPI
               security.protocol = PLAINTEXT
               send.buffer.bytes = 131072
               session.timeout.ms = 10000
               ssl.cipher.suites = null
               ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
               ssl.endpoint.identification.algorithm = https
               ssl.key.password = null
               ssl.keymanager.algorithm = SunX509
               ssl.keystore.location = null
               ssl.keystore.password = null
               ssl.keystore.type = JKS
               ssl.protocol = TLS
               ssl.provider = null
               ssl.secure.random.implementation = null
               ssl.trustmanager.algorithm = PKIX
               ssl.truststore.location = null
               ssl.truststore.password = null
               ssl.truststore.type = JKS
               value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties

bootstrapServers = "host1, host2, host3"
 transactionIdPrefix = "my-producer-"${instance}"
 "enable.idempotence" = "true"
 "acks" = "all"
 "retries" = "2147483647"
 "transaction.timeout.ms" = "10000"
 "max.in.flight.requests.per.connection" = "1"
 "reconnect.backoff.max.ms" = "1000"
 "reconnect.backoff.ms" = "1000"
 "retry.backoff.ms" = "1000"
h2. Application handling commits

Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description

-Write 32,000 messages to input topic

-Start 3 application instances

-Start process the messages one by one (max.poll.records = 1)

-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.

-Wait 60 seconds

-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.

-Wait 60 seconds

-処理中に、*SIGKILL*(kill -9)を*host2*および*host3 KafkaBrokers*に50回並行して送信します。

 

出力トピックへのメッセージは32,000であると予想されていましたが、実際には重複(少なくとも1つ)になる場合があります。

32,000通のメッセージが表示され、すべてが正しい場合があります。

問題が発生するたびに、イベントの順序は同じです。

{{グループが}}コミットの直前に{{リバランスしているため、}}この{{ハートビートの試行は失敗しました}}。

添付のアプリケーションログファイル。

  was:
Hi. There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration.

 

Please check below the application context and test scenario that causes this issue.
h2. Kafka Cluster Setup

3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)

3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
h3. Kafka configuration

broker.id=1,2,3
 num.network.threads=2
 num.io.threads=8
 socket.send.buffer.bytes=102400
 socket.receive.buffer.bytes=102400
 socket.request.max.bytes=104857600
 log.dirs=/home/kafka/logs/kafka
 min.insync.replicas=3
 transaction.state.log.min.isr=3
 default.replication.factor=3
 log.retention.minutes=600
 log.segment.bytes=1073741824
 log.retention.check.interval.ms=300000
 zookeeper.connect=host1:2181,host2:2181,host3:2181
 zookeeper.connection.timeout.ms=6000
 group.initial.rebalance.delay.ms=1000
 log.message.timestamp.type=LogAppendTime
 delete.topic.enable=true
 auto.create.topics.enable=false
 unclean.leader.election.enable=false
h3. ZooKeeper configuration

tickTime=2000
 dataDir=/home/kafka/logs/zk
 clientPort=2181
 maxClientCnxns=0
 initLimit=5
 syncLimit=2
 server.1=host1:2888:3888
 server.2=host2:2888:3888
 server.3=host3:2888:3888
 autopurge.snapRetainCount=3
 autopurge.purgeInterval=24
h2. Kafka internal topics description

Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
       Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
 ​
 Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
       Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event

Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
       Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
       Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
       Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
h3. Topic output-event

Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
       Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
       Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
       Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
h2. Application consumer properties

o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: 
               auto.commit.interval.ms = 5000
               auto.offset.reset = earliest
               bootstrap.servers = [host1:9092, host2:9092, host3:9092]
               check.crcs = true
               client.id = 
               connections.max.idle.ms = 540000
               default.api.timeout.ms = 60000
               enable.auto.commit = false
               exclude.internal.topics = true
               fetch.max.bytes = 134217728
               fetch.max.wait.ms = 500
               fetch.min.bytes = 1
               group.id = groupId
               heartbeat.interval.ms = 3000
               interceptor.classes = []
               internal.leave.group.on.close = true
               isolation.level = read_committed
               key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
               max.partition.fetch.bytes = 134217728
               max.poll.interval.ms = 300000
               max.poll.records = 1
               metadata.max.age.ms = 300000
               metric.reporters = []
               metrics.num.samples = 2
               metrics.recording.level = INFO
               metrics.sample.window.ms = 30000
               partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
               receive.buffer.bytes = 65536
               reconnect.backoff.max.ms = 1000
               reconnect.backoff.ms = 1000
               request.timeout.ms = 30000
               retry.backoff.ms = 1000
               sasl.client.callback.handler.class = null
               sasl.jaas.config = null
               sasl.kerberos.kinit.cmd = /usr/bin/kinit
               sasl.kerberos.min.time.before.relogin = 60000
               sasl.kerberos.service.name = null
               sasl.kerberos.ticket.renew.jitter = 0.05
               sasl.kerberos.ticket.renew.window.factor = 0.8
               sasl.login.callback.handler.class = null
               sasl.login.class = null
               sasl.login.refresh.buffer.seconds = 300
               sasl.login.refresh.min.period.seconds = 60
               sasl.login.refresh.window.factor = 0.8
               sasl.login.refresh.window.jitter = 0.05
               sasl.mechanism = GSSAPI
               security.protocol = PLAINTEXT
               send.buffer.bytes = 131072
               session.timeout.ms = 10000
               ssl.cipher.suites = null
               ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
               ssl.endpoint.identification.algorithm = https
               ssl.key.password = null
               ssl.keymanager.algorithm = SunX509
               ssl.keystore.location = null
               ssl.keystore.password = null
               ssl.keystore.type = JKS
               ssl.protocol = TLS
               ssl.provider = null
               ssl.secure.random.implementation = null
               ssl.trustmanager.algorithm = PKIX
               ssl.truststore.location = null
               ssl.truststore.password = null
               ssl.truststore.type = JKS
               value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties

bootstrapServers = "host1, host2, host3"
 transactionIdPrefix = "my-producer-"${instance}"
 "enable.idempotence" = "true"
 "acks" = "all"
 "retries" = "2147483647"
 "transaction.timeout.ms" = "10000"
 "max.in.flight.requests.per.connection" = "1"
 "reconnect.backoff.max.ms" = "1000"
 "reconnect.backoff.ms" = "1000"
 "retry.backoff.ms" = "1000"
h2. Application handling commits

Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description

-Write 32,000 messages to input topic

-Start 3 application instances

-Start process the messages one by one (max.poll.records = 1)

-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.

-Wait 60 seconds

-During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.

-Wait 60 seconds

-During processing, send *SIGKILL* (kill -9) in parallel to *host2* and *host3* Kafka Brokers for 50 times.

 

Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one).

There are times when we end up with 32,000 messages and everything is right.

Every time the issue occurs, the sequence of events is the same.

Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right before a commit.

Attached application log file.


> Receiving duplicates when application is configured for exactly once
> --------------------------------------------------------------------
>
>                 Key: KAFKA-9619
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9619
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, producer 
>    Affects Versions: 2.1.1, 2.4.0
>         Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
>            Reporter: Cristian Manoliu
>            Priority: Major
>              Labels: Kafka
>         Attachments: log
>
>
> こんにちは。すべてが高耐久性のために構成されていて、1回だけの構成を使用している場合でも、重複を受け取る場合があります(非常にまれですが、あります)。
>  
> この問題の原因となるアプリケーションコンテキストとテストシナリオを以下で確認してください。
> h2. Kafkaクラスターのセットアップ
> 3 x Kafkaブローカー(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
> 3 x Zookeeperインスタンス(*host1*に1つ、*host2*に2つ、*host3に*3*つ*)
> h3. Kafka構成
> ブローカーID = 1,2,3 
> num.network.threads = 2 
> num.io.threads = 8 
> socket.send.buffer.bytes = 102400 
> socket.receive.buffer.bytes = 102400 
> socket.request.max.bytes = 104857600 
> log.dirs = / home / kafka / logs / 
> kafka min.insync.replicas = 3 
> transaction.state.log.min.isr = 3 
> default.replication.factor = 3 
> log.retention.minutes = 600 
> log.segment.bytes = 1073741824 
> log.retention.check.interval.ms = 300000 zookeeper.connect = 
> host1:2181、host2:2181、host3:2181 zookeeper.connection.timeout.ms = 
> 6000 
> group.initial.rebalance.delay.ms = 1000 
> log.message .timestamp.type = LogAppendTime 
> delete.topic.enable = true 
> auto.create.topics.enable = false
>  unclean.leader.election.enable = false
> h3. ZooKeeper configuration
> tickTime=2000
>  dataDir=/home/kafka/logs/zk
>  clientPort=2181
>  maxClientCnxns=0
>  initLimit=5
>  syncLimit=2
>  server.1=host1:2888:3888
>  server.2=host2:2888:3888
>  server.3=host3:2888:3888
>  autopurge.snapRetainCount=3
>  autopurge.purgeInterval=24
> h2. Kafka internal topics description
> Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
>        Topic: __transaction_state     Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
>  ​
>  Topic:__consumer_offsets       PartitionCount:50       ReplicationFactor:3     Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
>        Topic: __consumer_offsets       Partition: 0   Leader: 1       Replicas: 3,2,1 Isr: 1,2,3
> h2. Application topics
> h3. Topic input-event
> Topic:input-event     PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
>        Topic: input-event     Partition: 0   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
>        Topic: input-event     Partition: 1   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
>        Topic: input-event     Partition: 2   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
> h3. Topic output-event
> Topic:output-event       PartitionCount:3       ReplicationFactor:3   Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
>        Topic: output-event       Partition: 0   Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
>        Topic: output-event       Partition: 1   Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
>        Topic: output-event       Partition: 2   Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
> h2. Application consumer properties
> o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: 
>                auto.commit.interval.ms = 5000
>                auto.offset.reset = earliest
>                bootstrap.servers = [host1:9092, host2:9092, host3:9092]
>                check.crcs = true
>                client.id = 
>                connections.max.idle.ms = 540000
>                default.api.timeout.ms = 60000
>                enable.auto.commit = false
>                exclude.internal.topics = true
>                fetch.max.bytes = 134217728
>                fetch.max.wait.ms = 500
>                fetch.min.bytes = 1
>                group.id = groupId
>                heartbeat.interval.ms = 3000
>                interceptor.classes = []
>                internal.leave.group.on.close = true
>                isolation.level = read_committed
>                key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>                max.partition.fetch.bytes = 134217728
>                max.poll.interval.ms = 300000
>                max.poll.records = 1
>                metadata.max.age.ms = 300000
>                metric.reporters = []
>                metrics.num.samples = 2
>                metrics.recording.level = INFO
>                metrics.sample.window.ms = 30000
>                partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
>                receive.buffer.bytes = 65536
>                reconnect.backoff.max.ms = 1000
>                reconnect.backoff.ms = 1000
>                request.timeout.ms = 30000
>                retry.backoff.ms = 1000
>                sasl.client.callback.handler.class = null
>                sasl.jaas.config = null
>                sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                sasl.kerberos.min.time.before.relogin = 60000
>                sasl.kerberos.service.name = null
>                sasl.kerberos.ticket.renew.jitter = 0.05
>                sasl.kerberos.ticket.renew.window.factor = 0.8
>                sasl.login.callback.handler.class = null
>                sasl.login.class = null
>                sasl.login.refresh.buffer.seconds = 300
>                sasl.login.refresh.min.period.seconds = 60
>                sasl.login.refresh.window.factor = 0.8
>                sasl.login.refresh.window.jitter = 0.05
>                sasl.mechanism = GSSAPI
>                security.protocol = PLAINTEXT
>                send.buffer.bytes = 131072
>                session.timeout.ms = 10000
>                ssl.cipher.suites = null
>                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                ssl.endpoint.identification.algorithm = https
>                ssl.key.password = null
>                ssl.keymanager.algorithm = SunX509
>                ssl.keystore.location = null
>                ssl.keystore.password = null
>                ssl.keystore.type = JKS
>                ssl.protocol = TLS
>                ssl.provider = null
>                ssl.secure.random.implementation = null
>                ssl.trustmanager.algorithm = PKIX
>                ssl.truststore.location = null
>                ssl.truststore.password = null
>                ssl.truststore.type = JKS
>                value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> h2. Application producer properties
> bootstrapServers = "host1, host2, host3"
>  transactionIdPrefix = "my-producer-"${instance}"
>  "enable.idempotence" = "true"
>  "acks" = "all"
>  "retries" = "2147483647"
>  "transaction.timeout.ms" = "10000"
>  "max.in.flight.requests.per.connection" = "1"
>  "reconnect.backoff.max.ms" = "1000"
>  "reconnect.backoff.ms" = "1000"
>  "retry.backoff.ms" = "1000"
> h2. Application handling commits
> Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE).
> h2. Test expected/actual
> h3. Test description
> -Write 32,000 messages to input topic
> -Start 3 application instances
> -Start process the messages one by one (max.poll.records = 1)
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times.
> -Wait 60 seconds
> -処理中に、*SIGKILL*(kill -9)を*host2*および*host3 KafkaBrokers*に50回並行して送信します。
>  
> 出力トピックへのメッセージは32,000であると予想されていましたが、実際には重複(少なくとも1つ)になる場合があります。
> 32,000通のメッセージが表示され、すべてが正しい場合があります。
> 問題が発生するたびに、イベントの順序は同じです。
> {{グループが}}コミットの直前に{{リバランスしているため、}}この{{ハートビートの試行は失敗しました}}。
> 添付のアプリケーションログファイル。



--
This message was sent by Atlassian Jira
(v8.3.4#803005)