You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/19 19:07:27 UTC
[kafka] branch trunk updated: MINOR: Fix Raft broker restart issue
when offset partitions are deferred #10155
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 76de614 MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
76de614 is described below
commit 76de61475bf61fdd746ba035cc8410b38462982d
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Feb 18 14:57:29 2021 -0500
MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
A Raft-based broker is unable to restart if the broker defers partition
metadata changes for a __consumer_offsets topic-partition. The issue is
that GroupMetadataManager is asked to removeGroupsForPartition() upon
the broker becoming a follower, but in order for that code to function
it requires that the manager's scheduler be started. There are multiple
possible solutions here since removeGroupsForPartition() is a no-op at
this point in the broker startup cycle (nothing has been loaded, so
there is nothing to unload). We could just not invoke the callback. But
it seems more reasonable to not special-case this and instead start
ReplicaManager and the coordinators just before applying the deferred
partitions states.
We also mark deferred partitions for which we are a follower as being
online a bit earlier to avoid NotLeaderOrFollowerException that was
being thrown upon restart. Fixing this issue exposed the above issue
regarding the scheduler not being started.
Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/server/BrokerServer.scala | 6 ++++--
core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala | 4 ++++
core/src/main/scala/kafka/server/RaftReplicaManager.scala | 2 ++
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 19d65ab..9aae5e3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -349,14 +349,16 @@ class BrokerServer(
// Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
logManager.startup(metadataCache.getAllTopics())
// Start other services that we've delayed starting, in the appropriate order.
- replicaManager.endMetadataChangeDeferral(
- RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
replicaManager.startup()
replicaManager.startHighWatermarkCheckPointThread()
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
getOrElse(config.offsetsTopicPartitions))
transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
getOrElse(config.transactionTopicPartitions))
+ // Apply deferred partition metadata changes after starting replica manager and coordinators
+ // so that those services are ready and able to process the changes.
+ replicaManager.endMetadataChangeDeferral(
+ RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
socketServer.startProcessingRequests(authorizerFutures)
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
index 0b7ee42..1bf21e0 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -35,6 +35,7 @@ trait RaftReplicaChangeDelegateHelper {
def getLogDir(topicPartition: TopicPartition): Option[String]
def error(msg: => String, e: => Throwable): Unit
def markOffline(topicPartition: TopicPartition): Unit
+ def markOnline(partition: Partition): Unit
def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit
def isShuttingDown: Boolean
def initialFetchOffset(log: Log): Long
@@ -216,6 +217,9 @@ class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
val leader = allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
val log = partition.localLogOrException
val fetchOffset = helper.initialFetchOffset(log)
+ if (deferredBatches) {
+ helper.markOnline(partition)
+ }
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 255b349..143709d 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -104,6 +104,8 @@ class RaftReplicaManager(config: KafkaConfig,
override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+ override def markOnline(partition: Partition): Unit = raftReplicaManager.allPartitions.put(partition.topicPartition, HostedPartition.Online(partition))
+
override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager