You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/08 20:09:47 UTC
[kafka] branch 2.8 updated: MINOR: Add ClusterTool as specified in
KIP-631 (#10047)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 68f5c9b MINOR: Add ClusterTool as specified in KIP-631 (#10047)
68f5c9b is described below
commit 68f5c9b8a39a7b71c163c03bf5b6c796be6b5cf1
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon Feb 8 12:07:39 2021 -0800
MINOR: Add ClusterTool as specified in KIP-631 (#10047)
Add ClusterTool as specified in KIP-631. It can report the current cluster ID, and also send the new RPC for removing broker registrations.
Reviewers: David Arthur <mu...@gmail.com>
---
bin/kafka-cluster.sh | 17 +++
build.gradle | 1 +
.../kafka/clients/admin/MockAdminClient.java | 25 ++++-
core/src/main/scala/kafka/tools/ClusterTool.scala | 125 +++++++++++++++++++++
core/src/main/scala/kafka/tools/TerseFailure.scala | 30 +++++
.../scala/unit/kafka/tools/ClusterToolTest.scala | 74 ++++++++++++
6 files changed, 268 insertions(+), 4 deletions(-)
diff --git a/bin/kafka-cluster.sh b/bin/kafka-cluster.sh
new file mode 100755
index 0000000..574007e
--- /dev/null
+++ b/bin/kafka-cluster.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@"
diff --git a/build.gradle b/build.gradle
index d9b185e..602645d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -751,6 +751,7 @@ project(':core') {
compile project(':clients')
compile project(':metadata')
compile project(':raft')
+ compile libs.argparse4j
compile libs.jacksonDatabind
compile libs.jacksonModuleScala
compile libs.jacksonDataformatCsv
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index a7dd619..c647e9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -69,6 +70,7 @@ public class MockAdminClient extends AdminClient {
new HashMap<>();
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
+ private final boolean usingRaftController;
private final String clusterId;
private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs;
@@ -90,6 +92,7 @@ public class MockAdminClient extends AdminClient {
private Node controller = null;
private List<List<String>> brokerLogDirs = new ArrayList<>();
private Short defaultPartitions;
+ private boolean usingRaftController = false;
private Integer defaultReplicationFactor;
public Builder() {
@@ -135,6 +138,11 @@ public class MockAdminClient extends AdminClient {
return this;
}
+ public Builder usingRaftController(boolean usingRaftController) {
+ this.usingRaftController = usingRaftController;
+ return this;
+ }
+
public Builder defaultPartitions(short numPartitions) {
this.defaultPartitions = numPartitions;
return this;
@@ -146,7 +154,8 @@ public class MockAdminClient extends AdminClient {
clusterId,
defaultPartitions != null ? defaultPartitions.shortValue() : 1,
defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
- brokerLogDirs);
+ brokerLogDirs,
+ usingRaftController);
}
}
@@ -156,7 +165,7 @@ public class MockAdminClient extends AdminClient {
public MockAdminClient(List<Node> brokers, Node controller) {
this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
- Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS));
+ Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
}
private MockAdminClient(List<Node> brokers,
@@ -164,7 +173,8 @@ public class MockAdminClient extends AdminClient {
String clusterId,
int defaultPartitions,
int defaultReplicationFactor,
- List<List<String>> brokerLogDirs) {
+ List<List<String>> brokerLogDirs,
+ boolean usingRaftController) {
this.brokers = brokers;
controller(controller);
this.clusterId = clusterId;
@@ -177,6 +187,7 @@ public class MockAdminClient extends AdminClient {
}
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
+ this.usingRaftController = usingRaftController;
}
synchronized public void controller(Node controller) {
@@ -889,7 +900,13 @@ public class MockAdminClient extends AdminClient {
@Override
public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
- throw new UnsupportedOperationException("Not implemented yet");
+ if (usingRaftController) {
+ return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
+ } else {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new UnsupportedVersionException(""));
+ return new DecommissionBrokerResult(future);
+ }
}
@Override
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
new file mode 100644
index 0000000..f0d3d90
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.store
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.utils.Utils
+
+object ClusterTool extends Logging {
+ def main(args: Array[String]): Unit = {
+ try {
+ val parser = ArgumentParsers.
+ newArgumentParser("kafka-cluster").
+ defaultHelp(true).
+ description("The Kafka cluster tool.")
+ val subparsers = parser.addSubparsers().dest("command")
+
+ val clusterIdParser = subparsers.addParser("cluster-id").
+ help("Get information about the ID of a cluster.")
+ val decommissionParser = subparsers.addParser("decommission").
+ help("Decommission a broker..")
+ List(clusterIdParser, decommissionParser).foreach(parser => {
+ parser.addArgument("--bootstrap-server", "-b").
+ action(store()).
+ help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
+ parser.addArgument("--config", "-c").
+ action(store()).
+ help("A property file containing configs to passed to AdminClient.")
+ })
+ decommissionParser.addArgument("--id", "-i").
+ `type`(classOf[Integer]).
+ action(store()).
+ help("The ID of the broker to decommission.")
+
+ val namespace = parser.parseArgsOrFail(args)
+ val command = namespace.getString("command")
+ val configPath = namespace.getString("config")
+ val properties = if (configPath == null) {
+ new Properties()
+ } else {
+ Utils.loadProps(configPath)
+ }
+ Option(namespace.getString("bootstrap_server")).
+ foreach(b => properties.setProperty("bootstrap.servers", b))
+ if (properties.getProperty("bootstrap.servers") == null) {
+ throw new TerseFailure("Please specify --bootstrap-server.")
+ }
+
+ command match {
+ case "cluster-id" =>
+ val adminClient = Admin.create(properties)
+ try {
+ clusterIdCommand(System.out, adminClient)
+ } finally {
+ adminClient.close()
+ }
+ Exit.exit(0)
+ case "decommission" =>
+ val adminClient = Admin.create(properties)
+ try {
+ decommissionCommand(System.out, adminClient, namespace.getInt("id"))
+ } finally {
+ adminClient.close()
+ }
+ Exit.exit(0)
+ case _ =>
+ throw new RuntimeException(s"Unknown command $command")
+ }
+ } catch {
+ case e: TerseFailure =>
+ System.err.println(e.getMessage)
+ System.exit(1)
+ }
+ }
+
+ def clusterIdCommand(stream: PrintStream,
+ adminClient: Admin): Unit = {
+ val clusterId = Option(adminClient.describeCluster().clusterId().get())
+ clusterId match {
+ case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.")
+ case Some(id) => stream.println(s"Cluster ID: ${id}")
+ }
+ }
+
+ def decommissionCommand(stream: PrintStream,
+ adminClient: Admin,
+ id: Int): Unit = {
+ try {
+ Option(adminClient.decommissionBroker(id).all().get())
+ stream.println(s"Broker ${id} is no longer registered. Note that if the broker " +
+ "is still running, or is restarted, it will re-register.")
+ } catch {
+ case e: ExecutionException => {
+ val cause = e.getCause()
+ if (cause.isInstanceOf[UnsupportedVersionException]) {
+ stream.println(s"The target cluster does not support broker decommissioning.")
+ } else {
+ throw e
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/tools/TerseFailure.scala b/core/src/main/scala/kafka/tools/TerseFailure.scala
new file mode 100644
index 0000000..c37b613
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/TerseFailure.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import org.apache.kafka.common.KafkaException
+
+/**
+ * An exception thrown to indicate that the command has failed, but we don't want to
+ * print a stack trace.
+ *
+ * @param message The message to print out before exiting. A stack trace will not
+ * be printed.
+ */
+class TerseFailure(message: String) extends KafkaException(message) {
+}
diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
new file mode 100644
index 0000000..0ce100c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import org.apache.kafka.clients.admin.MockAdminClient
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
+
+@Timeout(value = 60)
+class ClusterToolTest {
+ @Test
+ def testPrintClusterId(): Unit = {
+ val adminClient = new MockAdminClient.Builder().
+ clusterId("QtNwvtfVQ3GEFpzOmDEE-w").
+ build()
+ val stream = new ByteArrayOutputStream()
+ ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
+ assertEquals(
+ s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w
+""", stream.toString())
+ }
+
+ @Test
+ def testClusterTooOldToHaveId(): Unit = {
+ val adminClient = new MockAdminClient.Builder().
+ clusterId(null).
+ build()
+ val stream = new ByteArrayOutputStream()
+ ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
+ assertEquals(
+ s"""No cluster ID found. The Kafka version is probably too old.
+""", stream.toString())
+ }
+
+ @Test
+ def testDecommissionBroker(): Unit = {
+ val adminClient = new MockAdminClient.Builder().numBrokers(3).
+ usingRaftController(true).
+ build()
+ val stream = new ByteArrayOutputStream()
+ ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+ assertEquals(
+ s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register.
+""", stream.toString())
+ }
+
+ @Test
+ def testLegacyModeClusterCannotDecommissionBroker(): Unit = {
+ val adminClient = new MockAdminClient.Builder().numBrokers(3).
+ usingRaftController(false).
+ build()
+ val stream = new ByteArrayOutputStream()
+ ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+ assertEquals(
+ s"""The target cluster does not support broker decommissioning.
+""", stream.toString())
+ }
+}