You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/19 19:16:59 UTC
[kafka] branch 2.8 updated: MINOR: Fix Raft broker restart issue
when offset partitions are deferred #10155
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new e39bb7a MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
e39bb7a is described below
commit e39bb7a31cb65a59cd0b1263f382b072a15fa955
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Feb 18 14:57:29 2021 -0500
MINOR: Fix Raft broker restart issue when offset partitions are deferred #10155
A Raft-based broker is unable to restart if the broker defers partition
metadata changes for a __consumer_offsets topic-partition. The issue is
that GroupMetadataManager is asked to removeGroupsForPartition() upon
the broker becoming a follower, but in order for that code to function
it requires that the manager's scheduler be started. There are multiple
possible solutions here since removeGroupsForPartition() is a no-op at
this point in the broker startup cycle (nothing has been loaded, so
there is nothing to unload). We could just not invoke the callback. But
it seems more reasonable to not special-case this and instead start
ReplicaManager and the coordinators just before applying the deferred
partitions states.
We also mark deferred partitions for which we are a follower as being
online a bit earlier to avoid NotLeaderOrFollowerException that was
being thrown upon restart. Fixing this issue exposed the above issue
regarding the scheduler not being started.
Reviewers: Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/server/BrokerServer.scala | 6 ++++--
core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala | 4 ++++
core/src/main/scala/kafka/server/RaftReplicaManager.scala | 2 ++
3 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 19d65ab..9aae5e3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -349,14 +349,16 @@ class BrokerServer(
// Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
logManager.startup(metadataCache.getAllTopics())
// Start other services that we've delayed starting, in the appropriate order.
- replicaManager.endMetadataChangeDeferral(
- RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
replicaManager.startup()
replicaManager.startHighWatermarkCheckPointThread()
groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
getOrElse(config.offsetsTopicPartitions))
transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
getOrElse(config.transactionTopicPartitions))
+ // Apply deferred partition metadata changes after starting replica manager and coordinators
+ // so that those services are ready and able to process the changes.
+ replicaManager.endMetadataChangeDeferral(
+ RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
socketServer.startProcessingRequests(authorizerFutures)
diff --git a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
index 0b7ee42..1bf21e0 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
@@ -35,6 +35,7 @@ trait RaftReplicaChangeDelegateHelper {
def getLogDir(topicPartition: TopicPartition): Option[String]
def error(msg: => String, e: => Throwable): Unit
def markOffline(topicPartition: TopicPartition): Unit
+ def markOnline(partition: Partition): Unit
def completeDelayedFetchOrProduceRequests(topicPartition: TopicPartition): Unit
def isShuttingDown: Boolean
def initialFetchOffset(log: Log): Long
@@ -216,6 +217,9 @@ class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
val leader = allBrokersByIdMap(partition.leaderReplicaIdOpt.get).brokerEndPoint(helper.config.interBrokerListenerName)
val log = partition.localLogOrException
val fetchOffset = helper.initialFetchOffset(log)
+ if (deferredBatches) {
+ helper.markOnline(partition)
+ }
partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset)
}.toMap
diff --git a/core/src/main/scala/kafka/server/RaftReplicaManager.scala b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
index 255b349..143709d 100644
--- a/core/src/main/scala/kafka/server/RaftReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/RaftReplicaManager.scala
@@ -104,6 +104,8 @@ class RaftReplicaManager(config: KafkaConfig,
override def markOffline(topicPartition: TopicPartition): Unit = raftReplicaManager.markPartitionOffline(topicPartition)
+ override def markOnline(partition: Partition): Unit = raftReplicaManager.allPartitions.put(partition.topicPartition, HostedPartition.Online(partition))
+
override def replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = raftReplicaManager.replicaAlterLogDirsManager
override def replicaFetcherManager: ReplicaFetcherManager = raftReplicaManager.replicaFetcherManager