You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Maysam Yabandeh (JIRA)" <ji...@apache.org> on 2016/07/14 00:30:20 UTC

[jira] [Resolved] (KAFKA-3963) Missing messages from the controller to brokers

     [ https://issues.apache.org/jira/browse/KAFKA-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maysam Yabandeh resolved KAFKA-3963.
------------------------------------
    Resolution: Invalid

> Missing messages from the controller to brokers
> -----------------------------------------------
>
>                 Key: KAFKA-3963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3963
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Maysam Yabandeh
>            Priority: Minor
>             Fix For: 0.10.1.0
>
>
> The controller takes messages from a queue and send it to the designated broker. If the controller times out on receiving a response from the broker (30s) it closes the connection and retries again after a backoff period, however it does not return the message back to the queue. As a result the retry will start with the next message and the previous message might have never been received by the broker.
> {code}
>     val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
> ...
>           try {
> ...
>               clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
> ...
>             }
>           } catch {
>             case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
>               warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
>                 "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
>                   request.toString, brokerNode.toString()), e)
>               networkClient.close(brokerNode.idString)
> ...
>           }
> {code}
> This could violates the semantics that developers had assumed when writing controller-broker protocol. For example, the controller code sends metadata updates BEFORE sending LeaderAndIsrRequest when it communicates with a newly joined broker for the first time. 
> {code}
>   def onBrokerStartup(newBrokers: Seq[Int]) {
>     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
>     val newBrokersSet = newBrokers.toSet
>     // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new
>     // broker via this update.
>     // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the
>     // common controlled shutdown case, the metadata will reach the new brokers faster
>     sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
>     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
>     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
>     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
>     replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
> {code}
> This is important because without the metadata cached in the broker the LeaderAndIsrRequests that ask the broker to become a follower would fail since there is no metadata for leader of the partition.
> {code}
>         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
>           // Only change partition state when the leader is available
>           case Some(leaderBroker) =>
> ...
>           case None =>
>             // The leader broker should always be present in the metadata cache.
>             // If not, we should record the error message and abort the transition process for this partition
>             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
>               " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)