You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/11/27 21:39:35 UTC

(kafka) branch trunk updated: KAFKA-15819: KafkaServer.shutdown should free KafkaRaftManager (#14751)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb16a2d6288 KAFKA-15819: KafkaServer.shutdown should free KafkaRaftManager (#14751)
fb16a2d6288 is described below

commit fb16a2d628827813b08def493b8cd7d32ccb14ca
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Mon Nov 27 13:39:28 2023 -0800

    KAFKA-15819: KafkaServer.shutdown should free KafkaRaftManager (#14751)
    
    The other call sites for KafkaRaftManager (SharedServer, TestRaftServer, MetadataShell) appear to shutdown
    the KafkaRaftManager when shutting down themselves. The call-site in ZK-mode KafkaServer should behave
    the same way.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 core/src/main/scala/kafka/server/KafkaServer.scala | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 66e2cf42713..b68801faaa8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -197,6 +197,7 @@ class KafkaServer(
   def kafkaController: KafkaController = _kafkaController
 
   var lifecycleManager: BrokerLifecycleManager = _
+  private var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
 
   @volatile var brokerEpochManager: ZkBrokerEpochManager = _
 
@@ -415,7 +416,7 @@ class KafkaServer(
           // If the ZK broker is in migration mode, start up a RaftManager to learn about the new KRaft controller
           val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
             RaftConfig.parseVoterConnections(config.quorumVoters))
-          val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
+          raftManager = new KafkaRaftManager[ApiMessageAndVersion](
             metaPropsEnsemble.clusterId().get(),
             config,
             new MetadataRecordSerde,
@@ -1010,6 +1011,9 @@ class KafkaServer(
         // Clear all reconfigurable instances stored in DynamicBrokerConfig
         config.dynamicConfig.clear()
 
+        if (raftManager != null)
+          CoreUtils.swallow(raftManager.shutdown(), this)
+
         if (lifecycleManager != null) {
           lifecycleManager.close()
         }