You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/19 19:07:27 UTC

[kafka] branch trunk updated: MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 76de614  MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
76de614 is described below

commit 76de61475bf61fdd746ba035cc8410b38462982d
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Feb 18 14:57:29 2021 -0500

    MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
    
    A Raft-based broker is unable to restart if the broker defers partition
    metadata changes for a __consumer_offsets topic-partition. The issue is
    that GroupMetadataManager is asked to removeGroupsForPartition() upon
    the broker becoming a follower, but in order for that code to function
    it requires that the manager's scheduler be started. There are multiple
    possible solutions here since removeGroupsForPartition() is a no-op at
    this point in the broker startup cycle (nothing has been loaded, so
    there is nothing to unload). We could just not invoke the callback. But
    it seems more reasonable to not special-case this and instead start
    ReplicaManager and the coordinators just before applying the deferred
    partitions states.
    
    We also mark deferred partitions for which we are a follower as being
    online a bit earlier to avoid NotLeaderOrFollowerException that was
    being thrown upon restart. Fixing this issue exposed the above issue
    regarding the scheduler not being started.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/server/BrokerServer.scala              | 6 ++++--
 core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala | 4 ++++
 core/src/main/scala/kafka/server/RaftReplicaManager.scala        | 2 ++
 3 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 19d65ab..9aae5e3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -349,14 +349,16 @@ class BrokerServer(
       // Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
       logManager.startup(metadataCache.getAllTopics())
       // Start other services that we've delayed starting, in the appropriate order.
-      replicaManager.endMetadataChangeDeferral(
-        RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
       replicaManager.startup()
       replicaManager.startHighWatermarkCheckPointThread()
       groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
         getOrElse(config.offsetsTopicPartitions))
       transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
         getOrElse(config.transactionTopicPartitions))
+      // Apply deferred partition metadata changes after starting replica manager and coordinators
+      // so that those services are ready and able to process the changes.
+      replicaManager.endMetadataChangeDeferral(
+        RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
 
       socketServer.startProcessingRequests(authorizerFutures)
 
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
index 0b7ee42..1bf21e0 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -35,6 +35,7 @@ trait RaftReplicaChangeDelegateHelper {
   def getLogDir(topicPartition: TopicPartition): Option[String]
   def error(msg: => String, e: => Throwable): Unit
   def markOffline(topicPartition: TopicPartition): Unit
+  def markOnline(partition: Partition): Unit
   def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit
   def isShuttingDown: Boolean
   def initialFetchOffset(log: Log): Long
@@ -216,6 +217,9 @@ class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
             val leader = allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
             val log = partition.localLogOrException
             val fetchOffset = helper.initialFetchOffset(log)
+            if (deferredBatches) {
+              helper.markOnline(partition)
+            }
             partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
           }.toMap
 
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 255b349..143709d 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -104,6 +104,8 @@ class RaftReplicaManager(config: KafkaConfig,
 
     override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
 
+    override def markOnline(partition: Partition): Unit = raftReplicaManager.allPartitions.put(partition.topicPartition, HostedPartition.Online(partition))
+
     override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
 
     override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager