You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Joel Koshy (JIRA)" <ji...@apache.org> on 2014/04/10 03:04:35 UTC

[jira] [Created] (KAFKA-1382) Update zkVersion on partition state update failures

Joel Koshy created KAFKA-1382:
---------------------------------

             Summary: Update zkVersion on partition state update failures
                 Key: KAFKA-1382
                 URL: https://issues.apache.org/jira/browse/KAFKA-1382
             Project: Kafka
          Issue Type: Bug
            Reporter: Joel Koshy


Our updateIsr code is currently:

  private def updateIsr(newIsr: Set[Replica]) {
    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newIsr.mkString(",")))
    val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
    // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
    val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
      ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
    if (updateSucceeded){
      inSyncReplicas = newIsr
      zkVersion = newVersion
      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
    } else {
      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))
    }

We encountered an interesting scenario recently when a large producer fully
saturated the broker's NIC for over an hour. The large volume of data led to
a number of ISR shrinks (and subsequent expands). The NIC saturation
affected the zookeeper client heartbeats and led to a session timeout. The
timeline was roughly as follows:

- Attempt to expand ISR
- Expansion written to zookeeper (confirmed in zookeeper transaction logs)
- Session timeout after around 13 seconds (the configured timeout is 20
  seconds) so that lines up.
- zkclient reconnects to zookeeper (with the same session ID) and retries
  the write - but uses the old zkVersion. This fails because the zkVersion
  has already been updated (above).
- The ISR expand keeps failing after that and the only way to get out of it
  is to bounce the broker.

In the above code, if the zkVersion is different we should probably update
the cached version and even retry the expansion until it succeeds.





--
This message was sent by Atlassian JIRA
(v6.2#6252)