You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2018/11/28 09:10:00 UTC

[jira] [Commented] (KAFKA-7679) With acks=all a single "stuck" non-leader replica can cause a timeout

    [ https://issues.apache.org/jira/browse/KAFKA-7679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701568#comment-16701568 ] 

Ismael Juma commented on KAFKA-7679:
------------------------------------

I think there's a misunderstanding of the current behavior. I believe you're looking for something like [https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledgment]

 

There are some subtle challenges in implementing it correctly though, check the mailing list discussion.

> With acks=all a single "stuck" non-leader replica can cause a timeout
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7679
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7679
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Corentin Chary
>            Priority: Major
>
> From the documentation:
> {code:java}
> acks=all
> This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.{code}
> {code:java}
> min.insync.replicas
> When a producer sets acks to "all" (or "-1"), min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
> When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.	int{code}
> Given a replication factor of 3 and min.inseyc.repliacs set to 2, I would expect that the client get an acknowledgment as soon as it writes to the leader and at least one replica. This is what happens when on a 3 node cluster one of the broker is down for example.
> Howether, it looks like this is not the case when a broker is "stuck" (which happens when you have network "blips").
> Here is how I reproduced this, but you can probably do the same with iptables on your own cluster:
> {code:java}
> # Start a cluster with 3 nodes
> $ docker-compose up -d
> $ docker-compose scale kafka=3
> Starting kafka-docker_kafka_1_dbf4109a3095 ... done
> Creating kafka-docker_kafka_2_973a373fa5b5 ... done
> Creating kafka-docker_kafka_3_3d8fab2ac44a ... done
> # Create topics with various settings
> $ docker-compose exec kafka bash
> $ kafka-topics.sh --create --topic tests-irs2 --config min.insync.replicas=2 --zookeeper=${KAFKA_ZOOKEEPER_CONNECT} --partitions=1 --replication-factor=3
> $ kafka-topics.sh --describe --zookeeper ${KAFKA_ZOOKEEPER_CONNECT}
> Topic:tests-irs2 PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2
> Topic: tests-irs2 Partition: 0 Leader: 1003 Replicas: 1003,1002,1001 Isr: 1003,1002,1001{code}
>  
> Then start a small script that produces message periodically
>  
> {code:java}
> # Start the latency to get an idea of the normal latency
> $ KAFKA_BOOTSTRAP_SERVERS=localhost:32784 KAFKA_TOPIC=tests-irs2 KAFKA_ACKS=-1 ../test.py
> localhost:32784 tests-irs2 0.068457s
> localhost:32784 tests-irs2 0.016032s
> localhost:32784 tests-irs2 0.015884s
> localhost:32784 tests-irs2 0.018244s
> localhost:32784 tests-irs2 0.008625s{code}
>  
> Run `docker pause` on 1002
>  
> {code:java}
>  
> 2018-11-27 14:07:47,608 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
> Traceback (most recent call last):
>  File "../test.py", line 27, in send_message
>  producer.flush(timeout=MESSAGE_INTERVAL_SECS)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
>  self._accumulator.await_flush_completion(timeout=timeout)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
>  raise Errors.KafkaTimeoutError('Timeout waiting for future')
> KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
> 2018-11-27 14:07:49,618 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
> Traceback (most recent call last):
>  File "../test.py", line 27, in send_message
>  producer.flush(timeout=MESSAGE_INTERVAL_SECS)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
>  self._accumulator.await_flush_completion(timeout=timeout)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
>  raise Errors.KafkaTimeoutError('Timeout waiting for future')
> KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
> 2018-11-27 14:07:51,628 - kafka-producer [ERROR] - KafkaTimeoutError: Timeout waiting for future
> Traceback (most recent call last):
>  File "../test.py", line 27, in send_message
>  producer.flush(timeout=MESSAGE_INTERVAL_SECS)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/kafka.py", line 577, in flush
>  self._accumulator.await_flush_completion(timeout=timeout)
>  File "/Users/corentin.chary/dev/kafka-tests/kafka-docker/venv/lib/python2.7/site-packages/kafka/producer/record_accumulator.py", line 530, in await_flush_completion
>  raise Errors.KafkaTimeoutError('Timeout waiting for future')
> KafkaTimeoutError: KafkaTimeoutError: Timeout waiting for future
> localhost:32784 tests-irs2 0.017413s
> {code}
> We get a timeout until the controller gets:
> {code:java}
> [2018-11-27 13:15:18,020] INFO [Controller id=1001] Newly added brokers: , deleted brokers: 1002, all live brokers: 1001,1003 (kafka.controller.KafkaController)
> [2018-11-27 13:15:18,020] INFO [RequestSendThread controllerId=1001] Shutting down (kafka.controller.RequestSendThread)
> [2018-11-27 13:15:18,021] INFO [RequestSendThread controllerId=1001] Stopped (kafka.controller.RequestSendThread)
> [2018-11-27 13:15:18,021] INFO [RequestSendThread controllerId=1001] Shutdown completed (kafka.controller.RequestSendThread)
> [2018-11-27 13:15:18,024] INFO [Controller id=1001] Broker failure callback for 1002 (kafka.controller.KafkaController)
> [2018-11-27 13:15:18,030] DEBUG The stop replica request (delete = true) sent to broker 1002 is (kafka.controller.ControllerBrokerRequestBatch)
> [2018-11-27 13:15:18,030] DEBUG The stop replica request (delete = false) sent to broker 1002 is [Topic=tests-irs1,Partition=0,Replica=1002],[Topic=tests-irs2,Partition=0,Replica=1002] (kafka.controller.ControllerBrokerRequestBatch)
> [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
> [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=tests-irs1-0) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
> [2018-11-27 13:15:18,030] WARN [Channel manager on controller 1001]: Not sending request (type=StopReplicaRequest, controllerId=1001, controllerEpoch=1, deletePartitions=false, partitions=tests-irs2-0) to broker 1002, since it is offline. (kafka.controller.ControllerChannelManager)
> [2018-11-27 13:15:18,031] DEBUG [Controller id=1001] Unregister BrokerModifications handler for Vector(1002) (kafka.controller.KafkaController)
> [2018-11-27 13:15:23,676] INFO [Controller id=1001] Newly added brokers: 1002, deleted brokers: , all live brokers: 1001,1002,1003 (kafka.controller.KafkaController)
> [2018-11-27 13:15:23,676] DEBUG [Channel manager on controller 1001]: Controller 1001 trying to connect to broker 1002 (kafka.controller.ControllerChannelManager)
> [2018-11-27 13:15:23,680] INFO [Controller id=1001] New broker startup callback for 1002 (kafka.controller.KafkaController)
> [2018-11-27 13:15:23,680] INFO [RequestSendThread controllerId=1001] Starting (kafka.controller.RequestSendThread)
> {code}
>  
>  
> Looking at the code it goes down to:
> {code:java}
> val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
>   /*
>   * The topic may be configured not to accept messages if there are not enough replicas in ISR
>   * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
>   */
>   if (minIsr <= curInSyncReplicas.size)
>     (true, Errors.NONE)
>   else
>    (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
> } else
>   (false, Errors.NONE){code}
> So all should be good as soon as we have `curInSyncReplicas`, except that here is how highWatermark seems to be updated:
> {code:java}
> val allLogEndOffsets = assignedReplicas.filter { replica =>
>   curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
> }.map(_.logEndOffset)
> val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering){code}
> This would mean that when a replica is stuck, we can increase the latency of every producer by replicaLagTimeMaxMs until we basically mark this replica as "dead" and do not account for it anymore.
> I guess what we really want here is to check the offset on only 2 replicas.
> This also means that the latency of the producer is currently also the maximum latency of each of the replicas.
> What do you think?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)