You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/08 20:09:47 UTC

[kafka] branch 2.8 updated: MINOR: Add ClusterTool as specified in KIP-631 (#10047)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 68f5c9b  MINOR: Add ClusterTool as specified in KIP-631 (#10047)
68f5c9b is described below

commit 68f5c9b8a39a7b71c163c03bf5b6c796be6b5cf1
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon Feb 8 12:07:39 2021 -0800

    MINOR: Add ClusterTool as specified in KIP-631 (#10047)
    
    Add ClusterTool as specified in KIP-631. It can report the current cluster ID, and also send the new RPC for removing broker registrations.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 bin/kafka-cluster.sh                               |  17 +++
 build.gradle                                       |   1 +
 .../kafka/clients/admin/MockAdminClient.java       |  25 ++++-
 core/src/main/scala/kafka/tools/ClusterTool.scala  | 125 +++++++++++++++++++++
 core/src/main/scala/kafka/tools/TerseFailure.scala |  30 +++++
 .../scala/unit/kafka/tools/ClusterToolTest.scala   |  74 ++++++++++++
 6 files changed, 268 insertions(+), 4 deletions(-)

diff --git a/bin/kafka-cluster.sh b/bin/kafka-cluster.sh
new file mode 100755
index 0000000..574007e
--- /dev/null
+++ b/bin/kafka-cluster.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.ClusterTool "$@"
diff --git a/build.gradle b/build.gradle
index d9b185e..602645d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -751,6 +751,7 @@ project(':core') {
     compile project(':clients')
     compile project(':metadata')
     compile project(':raft')
+    compile libs.argparse4j
     compile libs.jacksonDatabind
     compile libs.jacksonModuleScala
     compile libs.jacksonDataformatCsv
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index a7dd619..c647e9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.requests.DescribeLogDirsResponse;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -69,6 +70,7 @@ public class MockAdminClient extends AdminClient {
         new HashMap<>();
     private final Map<TopicPartition, Long> beginningOffsets;
     private final Map<TopicPartition, Long> endOffsets;
+    private final boolean usingRaftController;
     private final String clusterId;
     private final List<List<String>> brokerLogDirs;
     private final List<Map<String, String>> brokerConfigs;
@@ -90,6 +92,7 @@ public class MockAdminClient extends AdminClient {
         private Node controller = null;
         private List<List<String>> brokerLogDirs = new ArrayList<>();
         private Short defaultPartitions;
+        private boolean usingRaftController = false;
         private Integer defaultReplicationFactor;
 
         public Builder() {
@@ -135,6 +138,11 @@ public class MockAdminClient extends AdminClient {
             return this;
         }
 
+        public Builder usingRaftController(boolean usingRaftController) {
+            this.usingRaftController = usingRaftController;
+            return this;
+        }
+
         public Builder defaultPartitions(short numPartitions) {
             this.defaultPartitions = numPartitions;
             return this;
@@ -146,7 +154,8 @@ public class MockAdminClient extends AdminClient {
                 clusterId,
                 defaultPartitions != null ? defaultPartitions.shortValue() : 1,
                 defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3),
-                brokerLogDirs);
+                brokerLogDirs,
+                usingRaftController);
         }
     }
 
@@ -156,7 +165,7 @@ public class MockAdminClient extends AdminClient {
 
     public MockAdminClient(List<Node> brokers, Node controller) {
         this(brokers, controller, DEFAULT_CLUSTER_ID, 1, brokers.size(),
-            Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS));
+            Collections.nCopies(brokers.size(), DEFAULT_LOG_DIRS), false);
     }
 
     private MockAdminClient(List<Node> brokers,
@@ -164,7 +173,8 @@ public class MockAdminClient extends AdminClient {
                             String clusterId,
                             int defaultPartitions,
                             int defaultReplicationFactor,
-                            List<List<String>> brokerLogDirs) {
+                            List<List<String>> brokerLogDirs,
+                            boolean usingRaftController) {
         this.brokers = brokers;
         controller(controller);
         this.clusterId = clusterId;
@@ -177,6 +187,7 @@ public class MockAdminClient extends AdminClient {
         }
         this.beginningOffsets = new HashMap<>();
         this.endOffsets = new HashMap<>();
+        this.usingRaftController = usingRaftController;
     }
 
     synchronized public void controller(Node controller) {
@@ -889,7 +900,13 @@ public class MockAdminClient extends AdminClient {
 
     @Override
     public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        if (usingRaftController) {
+            return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
+        } else {
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            future.completeExceptionally(new UnsupportedVersionException(""));
+            return new DecommissionBrokerResult(future);
+        }
     }
 
     @Override
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
new file mode 100644
index 0000000..f0d3d90
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.util.Properties
+import java.util.concurrent.ExecutionException
+
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.store
+import org.apache.kafka.clients.admin.Admin
+import org.apache.kafka.common.errors.UnsupportedVersionException
+import org.apache.kafka.common.utils.Utils
+
+object ClusterTool extends Logging {
+  def main(args: Array[String]): Unit = {
+    try {
+      val parser = ArgumentParsers.
+        newArgumentParser("kafka-cluster").
+        defaultHelp(true).
+        description("The Kafka cluster tool.")
+      val subparsers = parser.addSubparsers().dest("command")
+
+      val clusterIdParser = subparsers.addParser("cluster-id").
+        help("Get information about the ID of a cluster.")
+      val decommissionParser = subparsers.addParser("decommission").
+        help("Decommission a broker..")
+      List(clusterIdParser, decommissionParser).foreach(parser => {
+        parser.addArgument("--bootstrap-server", "-b").
+          action(store()).
+          help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
+        parser.addArgument("--config", "-c").
+          action(store()).
+          help("A property file containing configs to passed to AdminClient.")
+      })
+      decommissionParser.addArgument("--id", "-i").
+        `type`(classOf[Integer]).
+        action(store()).
+        help("The ID of the broker to decommission.")
+
+      val namespace = parser.parseArgsOrFail(args)
+      val command = namespace.getString("command")
+      val configPath = namespace.getString("config")
+      val properties = if (configPath == null) {
+        new Properties()
+      } else {
+        Utils.loadProps(configPath)
+      }
+      Option(namespace.getString("bootstrap_server")).
+        foreach(b => properties.setProperty("bootstrap.servers", b))
+      if (properties.getProperty("bootstrap.servers") == null) {
+        throw new TerseFailure("Please specify --bootstrap-server.")
+      }
+
+      command match {
+        case "cluster-id" =>
+          val adminClient = Admin.create(properties)
+          try {
+            clusterIdCommand(System.out, adminClient)
+          } finally {
+            adminClient.close()
+          }
+          Exit.exit(0)
+        case "decommission" =>
+          val adminClient = Admin.create(properties)
+          try {
+            decommissionCommand(System.out, adminClient, namespace.getInt("id"))
+          } finally {
+            adminClient.close()
+          }
+          Exit.exit(0)
+        case _ =>
+          throw new RuntimeException(s"Unknown command $command")
+      }
+    } catch {
+      case e: TerseFailure =>
+        System.err.println(e.getMessage)
+        System.exit(1)
+    }
+  }
+
+  def clusterIdCommand(stream: PrintStream,
+                       adminClient: Admin): Unit = {
+    val clusterId = Option(adminClient.describeCluster().clusterId().get())
+    clusterId match {
+      case None => stream.println(s"No cluster ID found. The Kafka version is probably too old.")
+      case Some(id) => stream.println(s"Cluster ID: ${id}")
+    }
+  }
+
+  def decommissionCommand(stream: PrintStream,
+                          adminClient: Admin,
+                          id: Int): Unit = {
+    try {
+      Option(adminClient.decommissionBroker(id).all().get())
+      stream.println(s"Broker ${id} is no longer registered. Note that if the broker " +
+        "is still running, or is restarted, it will re-register.")
+    } catch {
+      case e: ExecutionException => {
+        val cause = e.getCause()
+        if (cause.isInstanceOf[UnsupportedVersionException]) {
+          stream.println(s"The target cluster does not support broker decommissioning.")
+        } else {
+          throw e
+        }
+      }
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/tools/TerseFailure.scala b/core/src/main/scala/kafka/tools/TerseFailure.scala
new file mode 100644
index 0000000..c37b613
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/TerseFailure.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import org.apache.kafka.common.KafkaException
+
+/**
+ * An exception thrown to indicate that the command has failed, but we don't want to
+ * print a stack trace.
+ *
+ * @param message     The message to print out before exiting.  A stack trace will not
+ *                    be printed.
+ */
+class TerseFailure(message: String) extends KafkaException(message) {
+}
diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
new file mode 100644
index 0000000..0ce100c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import org.apache.kafka.clients.admin.MockAdminClient
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{Test, Timeout}
+
+@Timeout(value = 60)
+class ClusterToolTest {
+  @Test
+  def testPrintClusterId(): Unit = {
+    val adminClient = new MockAdminClient.Builder().
+      clusterId("QtNwvtfVQ3GEFpzOmDEE-w").
+      build()
+    val stream = new ByteArrayOutputStream()
+    ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
+    assertEquals(
+      s"""Cluster ID: QtNwvtfVQ3GEFpzOmDEE-w
+""", stream.toString())
+  }
+
+  @Test
+  def testClusterTooOldToHaveId(): Unit = {
+    val adminClient = new MockAdminClient.Builder().
+      clusterId(null).
+      build()
+    val stream = new ByteArrayOutputStream()
+    ClusterTool.clusterIdCommand(new PrintStream(stream), adminClient)
+    assertEquals(
+      s"""No cluster ID found. The Kafka version is probably too old.
+""", stream.toString())
+  }
+
+  @Test
+  def testDecommissionBroker(): Unit = {
+    val adminClient = new MockAdminClient.Builder().numBrokers(3).
+      usingRaftController(true).
+      build()
+    val stream = new ByteArrayOutputStream()
+    ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+    assertEquals(
+      s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register.
+""", stream.toString())
+  }
+
+  @Test
+  def testLegacyModeClusterCannotDecommissionBroker(): Unit = {
+    val adminClient = new MockAdminClient.Builder().numBrokers(3).
+      usingRaftController(false).
+      build()
+    val stream = new ByteArrayOutputStream()
+    ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+    assertEquals(
+      s"""The target cluster does not support broker decommissioning.
+""", stream.toString())
+  }
+}