You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/05/26 21:07:39 UTC
[kafka] branch trunk updated: MINOR: Fix some bugs with UNREGISTER_BROKER
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7143267f71 MINOR: Fix some bugs with UNREGISTER_BROKER
7143267f71 is described below
commit 7143267f71ca0c14957d8560fbc42a5f8aac564d
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu May 26 14:07:29 2022 -0700
MINOR: Fix some bugs with UNREGISTER_BROKER
Fix some bugs in the KRaft unregisterBroker API and add a junit test.
1. kafka-cluster-tool.sh unregister should fail if no broker ID is passed.
2. UnregisterBrokerRequest must be marked as a KRaft broker API so
that KRaft brokers can receive it.
3. KafkaApis.scala must forward UNREGISTER_BROKER to the controller.
Reviewers: Jason Gustafson <ja...@confluent.io>, dengziming <de...@gmail.com>, David Jacot <dj...@confluent.io>
---
.../common/message/UnregisterBrokerRequest.json | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 1 +
core/src/main/scala/kafka/tools/ClusterTool.scala | 1 +
.../kafka/server/KRaftClusterTest.scala | 53 ++++++++++++++++++++++
4 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index 4fb8d8df4e..05fd315bba 100644
--- a/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,7 +16,7 @@
{
"apiKey": 64,
"type": "request",
- "listeners": ["controller"],
+ "listeners": ["broker", "controller"],
"name": "UnregisterBrokerRequest",
"validVersions": "0",
"flexibleVersions": "0+",
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index dd3fb2dfea..e2537f4e2f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -229,6 +229,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
+ case ApiKeys.UNREGISTER_BROKER => forwardToControllerOrFail(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
index b868f72fc7..ed82eeba0e 100644
--- a/core/src/main/scala/kafka/tools/ClusterTool.scala
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -52,6 +52,7 @@ object ClusterTool extends Logging {
unregisterParser.addArgument("--id", "-i").
`type`(classOf[Integer]).
action(store()).
+ required(true).
help("The ID of the broker to unregister.")
val namespace = parser.parseArgsOrFail(args)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 69688da4f4..509facf921 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.image.ClusterImage
import org.apache.kafka.server.common.MetadataVersion
import org.slf4j.LoggerFactory
@@ -725,4 +726,56 @@ class KRaftClusterTest {
cluster.close()
}
}
+
+ private def clusterImage(
+ cluster: KafkaClusterTestKit,
+ brokerId: Int
+ ): ClusterImage = {
+ cluster.brokers().get(brokerId).metadataCache.currentImage().cluster()
+ }
+
+ private def brokerIsUnfenced(
+ image: ClusterImage,
+ brokerId: Int
+ ): Boolean = {
+ Option(image.brokers().get(brokerId)) match {
+ case None => false
+ case Some(registration) => !registration.fenced()
+ }
+ }
+
+ private def brokerIsAbsent(
+ image: ClusterImage,
+ brokerId: Int
+ ): Boolean = {
+ Option(image.brokers().get(brokerId)).isEmpty
+ }
+
+ @Test
+ def testUnregisterBroker(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ TestUtils.waitUntilTrue(() => brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be unfenced.")
+ cluster.brokers().get(0).shutdown()
+ TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.")
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ admin.unregisterBroker(0)
+ } finally {
+ admin.close()
+ }
+ TestUtils.waitUntilTrue(() => brokerIsAbsent(clusterImage(cluster, 1), 0),
+ "Timed out waiting for broker 0 to be fenced.")
+ } finally {
+ cluster.close()
+ }
+ }
}