You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/05/26 21:07:39 UTC

[kafka] branch trunk updated: MINOR: Fix some bugs with UNREGISTER_BROKER

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7143267f71 MINOR: Fix some bugs with UNREGISTER_BROKER
7143267f71 is described below

commit 7143267f71ca0c14957d8560fbc42a5f8aac564d
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu May 26 14:07:29 2022 -0700

    MINOR: Fix some bugs with UNREGISTER_BROKER
    
    Fix some bugs in the KRaft unregisterBroker API and add a junit test.
    
    1. kafka-cluster-tool.sh unregister should fail if no broker ID is passed.
    
    2. UnregisterBrokerRequest must be marked as a KRaft broker API so
    that KRaft brokers can receive it.
    
    3. KafkaApis.scala must forward UNREGISTER_BROKER to the controller.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>, David Jacot <dj...@confluent.io>
---
 .../common/message/UnregisterBrokerRequest.json    |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  1 +
 core/src/main/scala/kafka/tools/ClusterTool.scala  |  1 +
 .../kafka/server/KRaftClusterTest.scala            | 53 ++++++++++++++++++++++
 4 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index 4fb8d8df4e..05fd315bba 100644
--- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 64,
   "type": "request",
-  "listeners": ["controller"],
+  "listeners": ["broker", "controller"],
   "name": "UnregisterBrokerRequest",
   "validVersions": "0",
   "flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd3fb2dfea..e2537f4e2f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -229,6 +229,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
         case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
         case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
+        case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
         case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
         case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
         case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
index b868f72fc7..ed82eeba0e 100644
--- a/core/src/main/scala/kafka/tools/ClusterTool.scala
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -52,6 +52,7 @@ object ClusterTool extends Logging {
       unregisterParser.addArgument("--id", "-i").
         `type`(classOf[Integer]).
         action(store()).
+        required(true).
         help("The ID of the broker to unregister.")
 
       val namespace = parser.parseArgsOrFail(args)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 69688da4f4..509facf921 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.image.ClusterImage
 import org.apache.kafka.server.common.MetadataVersion
 import org.slf4j.LoggerFactory
 
@@ -725,4 +726,56 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  private def clusterImage(
+    cluster: KafkaClusterTestKit,
+    brokerId: Int
+  ): ClusterImage = {
+    cluster.brokers().get(brokerId).metadataCache.currentImage().cluster()
+  }
+
+  private def brokerIsUnfenced(
+    image: ClusterImage,
+    brokerId: Int
+  ): Boolean = {
+    Option(image.brokers().get(brokerId)) match {
+      case None => false
+      case Some(registration) => !registration.fenced()
+    }
+  }
+
+  private def brokerIsAbsent(
+    image: ClusterImage,
+    brokerId: Int
+  ): Boolean = {
+    Option(image.brokers().get(brokerId)).isEmpty
+  }
+
+  @Test
+  def testUnregisterBroker(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      TestUtils.waitUntilTrue(() => brokerIsUnfenced(clusterImage(cluster, 1), 0),
+        "Timed out waiting for broker 0 to be unfenced.")
+      cluster.brokers().get(0).shutdown()
+      TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 1), 0),
+        "Timed out waiting for broker 0 to be fenced.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.unregisterBroker(0)
+      } finally {
+        admin.close()
+      }
+      TestUtils.waitUntilTrue(() => brokerIsAbsent(clusterImage(cluster, 1), 0),
+        "Timed out waiting for broker 0 to be fenced.")
+    } finally {
+      cluster.close()
+    }
+  }
 }