You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Dmitry Bugaychenko (JIRA)" <ji...@apache.org> on 2015/03/24 20:35:53 UTC

[jira] [Comment Edited] (KAFKA-2029) Improving controlled shutdown for rolling updates

    [ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378433#comment-14378433 ] 

Dmitry Bugaychenko edited comment on KAFKA-2029 at 3/24/15 7:34 PM:
--------------------------------------------------------------------

We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding "tracked" we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
...
  private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) {
      // ODKL Patch: prioritize controller requests over data requests.
      requestQueue.putFirst(request)
      info("Escalated controller request: " + request.requestObj.describe(details = true))
    } else {
      requestQueue.putLast(request)
    }
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
    requestQueue.takeFirst()
...
{code}

It increased GC overhead but hasn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck.


was (Author: dmitrybugaychenko):
We tried prioritization of controller messages, but it din't help. Communication with a single broker is synchronous, but different brokers might handle requests on different speed - as a result with a large queue one broker can get way behind another one while controller thinks it is doing fine. Addiding "tracked" we ensure taht *all* brokers done the leadership movement, thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
...
  private val requestQueue = new LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId == RequestKeys.StopReplicaKey) {
      // ODKL Patch: prioritize controller requests over data requests.
      requestQueue.putFirst(request)
      info("Escalated controller request: " + request.requestObj.describe(details = true))
    } else {
      requestQueue.putLast(request)
    }
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
    requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - it looks like the network request processing is not the botlleneck.

> Improving controlled shutdown for rolling updates
> -------------------------------------------------
>
>                 Key: KAFKA-2029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2029
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller
>    Affects Versions: 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Neha Narkhede
>            Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that "replica does not exists" - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*.
> # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
>     brokerLock synchronized {
>       val stateInfoOpt = brokerStateInfo.get(brokerId)
>       stateInfoOpt match {
>         case Some(stateInfo) =>
>           // ODKL Patch: prevent infinite hang on trying to send message to a dead broker.
>           // TODO: Move timeout to config
>           if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) {
>             error("Timed out trying to send message to broker " + brokerId.toString)
>             // Do not throw, as it brings controller into completely non-functional state
>             // "Controller to broker state change requests batch is not empty while creating a new one"
>             //throw new IllegalStateException("Timed out trying to send message to broker " + brokerId.toString)
>           }
>         case None =>
>           warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
>       }
>     }
>   }
> {code}
> # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala:
> {code}
>   // ODKL Patch to prevent deadlocks in shutdown.
>   /**
>    * Execute the given function inside the lock
>    */
>   def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
>     if (isRunning || lock.isHeldByCurrentThread) {
>       // TODO: Configure timeout.
>       if (!lock.tryLock(10, TimeUnit.SECONDS)) {
>         throw new IllegalStateException("Failed to acquire controller lock in 10 seconds.");
>       }
>       try {
>         return fun
>       } finally {
>         lock.unlock()
>       }
>     } else {
>       throw new IllegalStateException("Controller is not running, not allowed to lock.")
>     }
>   }
>   private def checkAndTriggerPartitionRebalance(): Unit = {
>     // Use inLockIfRunning here instead of inLock
>   }
> {code}
> # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act in a way that they prefer the oldes replica in ISR (the one that joined the ISR first). In case of rolling update it means moving partitions to the tail which increases the overal amount of movements and finally significantly overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of leadership). In case of multiple failures this logic can cuase a significant disbalance in the leadership. *Proposed solution: Move leadership to preferd replica if possible or to the younges replica (in controlled shutdown) or second prefered replica (in offline partition)*:
> {code}
> class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
> ...
>             // ODKL Patch: Trying to select replica from ISR not depending on ISR join order, but following the
>             // assignment order. Preferred replica is the first one, thus if possible it'll be chosen, but most
>             // probably it is the dead one, thus we fallback to second preferred replica. Here we do not care about
>             // overloading second preferred replica as we do not expect rolling crashed.
>             val newLeader = liveBrokersInIsr.sortBy(x => assignedReplicas.indexOf(x)).head
> ...
> }
> class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
>         extends PartitionLeaderSelector
>         with Logging {
> ...
>     // ODKL Patch: Trying to select replica from ISR not depending on ISR join order. If preferred replica is in ISR, choose
>     // it, choose the last replica from ISR - it is expected to be the youngest (most probably already survived rolling
>     // update)
>     val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) assignedReplicas.headOption else newIsr.lastOption
> ...
> }
> {code}
> # Auto leader rebalance started simultaneously with controlled shutdown compete with it for space in queue and can slow down the process. If the queue size is large it could also create a significant data loss (for few minutes there might be multiple brokers considering itself as a leader and accepting produce requests). *Proposed solution: add throttling to the auto rebalance*:
> {code}
> private def checkAndTriggerPartitionRebalance(): Unit = {
> ...
>           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
>             info("Balancing broker " + leaderBroker + " with imbalance rate " + imbalanceRatio)
>             topicsNotInPreferredReplica.foreach {
>               case (topicPartition, replicas) => {
>                 inLockIfRunning(controllerContext.controllerLock) {
>                   // do this check only if the broker is live and there are no partitions being reassigned currently
>                   // and preferred replica election is not in progress
>                   if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
>                     controllerContext.partitionsBeingReassigned.size == 0 &&
>                     controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
>                     !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
>                     !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
>                     controllerContext.allTopics.contains(topicPartition.topic)) {
>                     onPreferredReplicaElection(Set(topicPartition), true)
>                   }
>                 }
>                 // ODKL patch: prevent too fast prefered replica elections.
>                 // TODO: Make configurable/use true throttling
>                 Utils.swallow(Thread.sleep(2000))
>               }
>             }
>             info("Balancing broker " + leaderBroker + " done")
>           }
> ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)