You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Avanish Mishra <av...@yahoo.com.INVALID> on 2016/02/25 07:38:03 UTC

Not able to read committed offset on nodes failures with replication setup.

We are running 10 node kafka cluster in test setup with replication factor of 3 and topics with min.insync.replica as 2.
Recently i noticed that few nodes halted on restart after multiple node failure with FATAL message:

"Halting because log truncation is not allowed for topic 1613_spam, Current leader 2003's latest offset 20 is less than replica 2004's latest offset 21 (kafka.server.ReplicaFetcherThread)"
My understanding is that this can happen if there is slow replica in ISR which doesn't have latest committed message and high water mark. As min.insync.replicas is 2, write will be committed when it complete on leader and 1 follower. Since replica.lag.time.max.ms setting is 10000, any slow replica can be in ISR for last 10 sec without fetching any message. if leader goes down within that interval and slow follower is elected as leader, this will result in new leader with offset less than the follower.  Is this explanation correct or i am missing something? What is the best way to recover committed message in such situation?
 
We are running cluster with following settings.
-  replication factor  3-  min.insync.replicas is set to 2.
 -  request.required.acks -1-  unclean.leader.election.enable is set to false-  replica.lag.time.max.ms is 10000-  replica.high.watermark.checkpoint.interval.ms 1000


Thanks 
Avanish

Re: Not able to read committed offset on nodes failures with replication setup.

Posted by YouPeng Yang <yy...@gmail.com>.
Hi
  Here is the doc in the ReplicaFetcherThread.handleOffsetOutOfRange which
maybe the answer:

 /**
   * Handle a partition whose offset is out of range and return a new fetch
offset.
   */
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
    val replica = replicaMgr.getReplica(topicAndPartition.topic,
topicAndPartition.partition).get

    /**
     * Unclean leader election: A follower goes down, in the meanwhile the
leader keeps appending messages. The follower comes back up
     * and before it has completely caught up with the leader's logs, all
replicas in the ISR go down. The follower is now uncleanly
     * elected as the new leader, and it starts appending messages from the
client. The old leader comes back up, becomes a follower
     * and it may discover that the current leader's end offset is behind
its own end offset.
     *
     * In such a case, truncate the current follower's log to the current
leader's end offset and continue fetching.
     *
     * There is a potential for a mismatch between the logs of the two
replicas here. We don't fix this mismatch as of now.
     */
    val leaderEndOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.LatestTime, brokerConfig.brokerId)
    if (leaderEndOffset < replica.logEndOffset.messageOffset) {
      // Prior to truncating the follower's log, ensure that doing so is
not disallowed by the configuration for unclean leader election.
      // This situation could only happen if the unclean election
configuration for a topic changes while a replica is down. Otherwise,
      // we should never encounter this situation since a non-ISR leader
cannot be elected if disallowed by the broker configuration.
      if (!LogConfig.fromProps(brokerConfig.toProps,
AdminUtils.fetchTopicConfig(replicaMgr.zkClient,
        topicAndPartition.topic)).uncleanLeaderElectionEnable) {
        // Log a fatal error and shutdown the broker to ensure that data
loss does not unexpectedly occur.
        fatal("Halting because log truncation is not allowed for topic
%s,".format(topicAndPartition.topic) +
          " Current leader %d's latest offset %d is less than replica %d's
latest offset %d"
          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId,
replica.logEndOffset.messageOffset))
        Runtime.getRuntime.halt(1)
      }

      replicaMgr.logManager.truncateTo(Map(topicAndPartition ->
leaderEndOffset))
      warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's latest offset %d"
        .format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
      leaderEndOffset
    } else {
      /**
       * The follower could have been down for a long time and when it
starts up, its end offset could be smaller than the leader's
       * start offset because the leader has deleted old logs
(log.logEndOffset < leaderStartOffset).
       *
       * Roll out a new log at the follower with the start offset equal to
the current leader's start offset and continue fetching.
       */
      val leaderStartOffset =
simpleConsumer.earliestOrLatestOffset(topicAndPartition,
OffsetRequest.EarliestTime, brokerConfig.brokerId)
      replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition,
leaderStartOffset)
      warn("Replica %d for partition %s reset its fetch offset from %d to
current leader %d's start offset %d"
        .format(brokerConfig.brokerId, topicAndPartition,
replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
      leaderStartOffset
    }
  }

2016-02-25 14:38 GMT+08:00 Avanish Mishra <av...@yahoo.com.invalid>:

> We are running 10 node kafka cluster in test setup with replication factor
> of 3 and topics with min.insync.replica as 2.
> Recently i noticed that few nodes halted on restart after multiple node
> failure with FATAL message:
>
> "Halting because log truncation is not allowed for topic 1613_spam,
> Current leader 2003's latest offset 20 is less than replica 2004's latest
> offset 21 (kafka.server.ReplicaFetcherThread)"
> My understanding is that this can happen if there is slow replica in ISR
> which doesn't have latest committed message and high water mark. As
> min.insync.replicas is 2, write will be committed when it complete on
> leader and 1 follower. Since replica.lag.time.max.ms setting is 10000,
> any slow replica can be in ISR for last 10 sec without fetching any
> message. if leader goes down within that interval and slow follower is
> elected as leader, this will result in new leader with offset less than the
> follower.  Is this explanation correct or i am missing something? What is
> the best way to recover committed message in such situation?
>
> We are running cluster with following settings.
> -  replication factor  3-  min.insync.replicas is set to 2.
>  -  request.required.acks -1-  unclean.leader.election.enable is set to
> false-  replica.lag.time.max.ms is 10000-
> replica.high.watermark.checkpoint.interval.ms 1000
>
>
> Thanks
> Avanish