You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2016/06/02 22:50:59 UTC

[jira] [Commented] (KAFKA-3693) Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up

    [ https://issues.apache.org/jira/browse/KAFKA-3693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313234#comment-15313234 ] 

Jun Rao commented on KAFKA-3693:
--------------------------------

[~maysamyabandeh], the logic in KafkaController.shutdownBroker() is actually expected. This logic is triggered when a broker is being shut down in a controlled way. In this case, the broker will first send a request to the controller to indicate that it intends to shut down. On receiving this request, the controller will call KafkaController.shutdownBroker() and tries to move the leader off the broker to be shut down, one partition at a time. There is no need to do all partitions at the same time in this case since this is not the first LeaderAndIsrRequest that the controller sends to the broker.

Do you think you can reproduce the issue in some simpler way?

> Race condition between highwatermark-checkpoint thread and handleLeaderAndIsrRequest at broker start-up
> -------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3693
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Maysam Yabandeh
>
> Upon broker start-up, a race between highwatermark-checkpoint thread to write replication-offset-checkpoint file and handleLeaderAndIsrRequest thread reading from it causes the highwatermark for some partitions to be reset to 0. In the good case, this results the replica to truncate its entire log to 0 and hence initiates fetching of terabytes of data from the lead broker, which sometimes leads to hours of downtime. We observed the bad cases that the reset offset can propagate to recovery-point-offset-checkpoint file, making a lead broker to truncate the file. This seems to have the potential to lead to data loss if the truncation happens at both follower and leader brokers.
> This is the particular faulty scenario manifested in our tests:
> # The broker restarts and receive LeaderAndIsr from the controller
> # LeaderAndIsr message however does not contain all the partitions (probably because other brokers were churning at the same time)
> # becomeLeaderOrFollower calls getOrCreatePartition and updates the allPartitions with the partitions included in the LeaderAndIsr message {code}
>   def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
>     var partition = allPartitions.get((topic, partitionId))
>     if (partition == null) {
>       allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
> {code}
> # replication-offset-checkpoint jumps in taking a snapshot of (the partial) allReplicas' high watermark into replication-offset-checkpoint file {code}  def checkpointHighWatermarks() {
>     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}{code} hence rewriting the previous highwatermarks.
> # Later becomeLeaderOrFollower calls makeLeaders and makeFollowers which read the (now partial) file through Partition::getOrCreateReplica {code}
>           val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
>           val offsetMap = checkpoint.read
>           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
>             info("No checkpointed highwatermark is found for partition [%s,%d]".format(topic, partitionId))
> {code}
> We are not entirely sure whether the initial LeaderAndIsr message including a subset of partitions is critical in making this race condition manifest or not. But it is an important detail since it clarifies that a solution based on not letting the highwatermark-checkpoint thread jumping in the middle of processing a LeaderAndIsr message would not suffice.
> The solution we are thinking of is to force initializing allPartitions by the partitions listed in the replication-offset-checkpoint (and perhaps recovery-point-offset-checkpoint file too) when a server starts.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)