You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/11 17:38:48 UTC

[GitHub] [kafka] wenbingshen opened a new pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

wenbingshen opened a new pull request #10304:
URL: https://github.com/apache/kafka/pull/10304


   When non-existent brokerIds value are given, the kafka-log-dirs tool will have a timeout error:
   
   Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs
   at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50)
   at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36)
   at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
   Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs
   
    
   
   When the brokerId entered by the user does not exist, an error message indicating that the node is not present should be printed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594483345



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")

Review comment:
       Good idea.I will act right away.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595060548



##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.admin.LogDirsCommand
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+
+class LogDirsCommandTest extends KafkaServerTestHarness {
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnect)
+      .map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def checkLogDirsCommandOutput(): Unit = {
+    val byteArrayOutputStream = new ByteArrayOutputStream
+    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
+    //input exist brokerList
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
+    val existBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+    val existBrokersLineIter = existBrokersContent.split("\n").iterator
+
+    assertTrue(existBrokersLineIter.hasNext)
+    assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
+
+    //input nonExist brokerList
+    byteArrayOutputStream.reset()
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream)

Review comment:
       @chia7712 I have added the duplicate ids test and it passed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-800833691


   > @wenbingshen thanks for your patch. a couple of comments. please take a look.
   
   Thanks for your comments.I submitted the latest code, please review it again. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-799498049


   > @wenbingshen Thanks for the updates. I have left few more minor comments. Also, it seems that the build failed. Could you check it?
   
   Thank you for your review and suggestions. I have submitted the latest code, and the code has been tested and compiled successfully. Please help review it again, thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-800805388


   Good afternoon @chia7712 @dajac Dear committers, if you have any comments on this PR, I will continue to improve it. If you are satisfied with it, can I apply for this PR to be merged into the trunk? :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595008454



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist brokers: ${clusterBrokers.mkString(",")}")

Review comment:
       How about `current existent brokers: `

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)

Review comment:
       the resource declaration should be followed by `try` block.
   ```scala
   val adminClient = createAdminClient(opts)
   try {
   
   }
   ```

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist brokers: ${clusterBrokers.mkString(",")}")

Review comment:
       Also, could you separate this line?

##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.admin.LogDirsCommand
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+
+class LogDirsCommandTest extends KafkaServerTestHarness {
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnect)
+      .map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def checkLogDirsCommandOutput(): Unit = {
+    val byteArrayOutputStream = new ByteArrayOutputStream
+    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
+    //input exist brokerList
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
+    val existBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+    val existBrokersLineIter = existBrokersContent.split("\n").iterator
+
+    assertTrue(existBrokersLineIter.hasNext)
+    assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
+
+    //input nonExist brokerList
+    byteArrayOutputStream.reset()
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream)

Review comment:
       Could you input duplicate ids and the check the output does not include duplicates?

##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.admin.LogDirsCommand
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+
+class LogDirsCommandTest extends KafkaServerTestHarness {
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnect)
+      .map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def checkLogDirsCommandOutput(): Unit = {
+    val byteArrayOutputStream = new ByteArrayOutputStream
+    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
+    //input exist brokerList
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
+    val existBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+    val existBrokersLineIter = existBrokersContent.split("\n").iterator
+
+    assertTrue(existBrokersLineIter.hasNext)
+    assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
+
+    //input nonExist brokerList

Review comment:
       how about using `nonexistent` instead of `nonExist`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594476389



##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,52 @@
+package unit.kafka.admin

Review comment:
       We must add the licence header here. You can copy it from another file.

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")

Review comment:
       nit: Should we say `--broker-list` instead of `broker-list`? Also, should we say `broker(s)` instead of `node(s)` to be consistent with the message below?

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")
+            } else {
+                out.println("Querying brokers for log directories information")
+                val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)

Review comment:
       nit: `DescribeLogDirsResult` can be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593915131



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {

Review comment:
       the `{}` is redundant.

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {
+                val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+                nonExistBrokers = inputBrokers.filterNot(brokerId => clusterBrokers.contains(brokerId))
+                inputBrokers
+            }
+            case None => clusterBrokers
         }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+        if (!nonExistBrokers.isEmpty) {

Review comment:
       maybe `nonEmpty`?

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {
+                val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)

Review comment:
       Could you replace `!_.isEmpty` by `_.nonEmpty`? It seems to me the later is more readable.

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {
+                val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+                nonExistBrokers = inputBrokers.filterNot(brokerId => clusterBrokers.contains(brokerId))

Review comment:
       How about using `diff` to get `nonExistBrokers`?

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray

Review comment:
       Is `Set[Int]` more suitable for this case? 

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {
+                val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+                nonExistBrokers = inputBrokers.filterNot(brokerId => clusterBrokers.contains(brokerId))
+                inputBrokers
+            }
+            case None => clusterBrokers
         }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+        if (!nonExistBrokers.isEmpty) {
+            out.println(s"ERROR: The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}")

Review comment:
       Could we show existent brokers also?

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,17 +40,27 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+        var nonExistBrokers: Array[Int] = Array.emptyIntArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case Some(brokerListStr) => {
+                val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
+                nonExistBrokers = inputBrokers.filterNot(brokerId => clusterBrokers.contains(brokerId))
+                inputBrokers
+            }
+            case None => clusterBrokers
         }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+        if (!nonExistBrokers.isEmpty) {
+            out.println(s"ERROR: The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}")
+        } else {
+            out.println("Querying brokers for log directories information")
+            val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
+            val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
 
-        out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
-        out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
+            out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
+            out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
+        }
         adminClient.close()

Review comment:
       Could you use `try-finally` to release `adminClient`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-798936865


   > @wenbingshen thanks for this contribution. a couple of comments are left. Most of them are related to code style. Otherwise, LGTM
   
   Thank you very much for your patience and guidance. These comments are very important. I submitted them after modifying them. Please review them again, thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594184162



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,31 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        var nonExistBrokers: Set[Int] = Set.empty
+        try {
+            val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet

Review comment:
       nit: We can remove specifying `Set[Int]`.

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,31 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        var nonExistBrokers: Set[Int] = Set.empty
+        try {
+            val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet

Review comment:
       ditto.

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,31 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        var nonExistBrokers: Set[Int] = Set.empty
+        try {
+            val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    nonExistBrokers = inputBrokers.diff(clusterBrokers)
+                    inputBrokers
+                case None => clusterBrokers
+            }

Review comment:
       nit: We usually avoid using mutable variable unless it is really necessary. In this case, I would rather return the `nonExistingBrokers` when the argument is processed. Something like this:
   
   ```
   val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
       case Some(brokerListStr) =>
           val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
          (inputBrokers, inputBrokers.diff(clusterBrokers)
       case None => (clusterBrokers, Set.empty)
   }
   ```

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,31 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        var nonExistBrokers: Set[Int] = Set.empty
+        try {
+            val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    nonExistBrokers = inputBrokers.diff(clusterBrokers)
+                    inputBrokers
+                case None => clusterBrokers
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistBrokers.nonEmpty) {
+                out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")
+            } else {
+                out.println("Querying brokers for log directories information")
+                val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)

Review comment:
       nit: We can remove `DescribeLogDirsResult`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595061621



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)

Review comment:
       Good.It has been modified in the latest submission.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-800160915


   > @wenbingshen thanks for your patch. overall LGTM. a couple of trivial comments are left. please take a look :)
   
   Thanks for your comments.According to your comments, I have submitted the latest code, please review it again :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 merged pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
chia7712 merged pull request #10304:
URL: https://github.com/apache/kafka/pull/10304


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593706745



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,9 +40,16 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
             case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case None => clusterBrokers
+        }
+
+        val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId))
+        if (!nonExistBrokers.isEmpty) {
+          System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}")
+          sys.exit(1)

Review comment:
       > Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think?
   
   Your suggestion is very good.I have changed the logic, please review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594482210



##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,52 @@
+package unit.kafka.admin

Review comment:
       Sorry, after checking the compilation report, I have realized this problem and I have made changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-797884025


   > Thanks for the PR. I left a minor suggestion. Could we also add a test case?
   
   Thanks for your comment.Your suggestion is very good, we only need to judge on the node entered by the user, I have added a unit test, please review it again.Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r594484169



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,29 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+        try {
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))
+                case None => (clusterBrokers, Set.empty)
+            }
 
-        out.println("Querying brokers for log directories information")
-        val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava)
-        val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala }
+            if (nonExistingBrokers.nonEmpty) {
+                out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}")
+            } else {
+                out.println("Querying brokers for log directories information")
+                val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava)

Review comment:
       Sorry, I forgot this, I will change it right away




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-796915807


   @chia7712 Can you help review this PR? Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-801593788


   > Good afternoon @chia7712 @dajac Dear committers, if you have any comments on this PR, I will continue to improve it. If you are satisfied with it, can I apply for this PR to be merged into the trunk? :)
   
   Dear @chia7712 @dajac If there are no other problems,can you help advance this pr?Thanks very much! :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-799585108


   > @wenbingshen Thanks for the updates. Let few more minot comments.
   
   Thank you for your commonts.I submitted the latest code, please review it, thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-801702252


   > LGTM. @chia7712 If you are fine with the PR, I let you merge it.
   
   sure. will merge it after I take a final review :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r593503408



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -40,9 +40,16 @@ object LogDirsCommand {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
         val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
+        val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
         val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
             case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
+            case None => clusterBrokers
+        }
+
+        val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId))
+        if (!nonExistBrokers.isEmpty) {
+          System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}")
+          sys.exit(1)

Review comment:
       Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wenbingshen commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
wenbingshen commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595060904



##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.admin.LogDirsCommand
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+
+import scala.collection.Seq
+
+class LogDirsCommandTest extends KafkaServerTestHarness {
+
+  def generateConfigs: Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(1, zkConnect)
+      .map(KafkaConfig.fromProps)
+  }
+
+  @Test
+  def checkLogDirsCommandOutput(): Unit = {
+    val byteArrayOutputStream = new ByteArrayOutputStream
+    val printStream = new PrintStream(byteArrayOutputStream, false, StandardCharsets.UTF_8.name())
+    //input exist brokerList
+    LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0", "--describe"), printStream)
+    val existBrokersContent = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+    val existBrokersLineIter = existBrokersContent.split("\n").iterator
+
+    assertTrue(existBrokersLineIter.hasNext)
+    assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information"))
+
+    //input nonExist brokerList

Review comment:
       It has been modified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10304: KAFKA-12454:Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#discussion_r595727918



##########
File path: core/src/test/scala/unit/kafka/admin/LogDirsCommandTest.scala
##########
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.admin

Review comment:
       the package name should be `kafka.admin` rather than `package unit.kafka.admin`

##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -39,19 +39,30 @@ object LogDirsCommand {
     def describe(args: Array[String], out: PrintStream): Unit = {
         val opts = new LogDirsCommandOptions(args)
         val adminClient = createAdminClient(opts)
-        val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty)
-        val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match {
-            case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt)
-            case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray
-        }
+        try {
+            val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty)
+            val clusterBrokers = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet
+            val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
+                case Some(brokerListStr) =>
+                    val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
+                    (inputBrokers, inputBrokers.diff(clusterBrokers))

Review comment:
       As the variable is called `existingBrokers `, we should find out the "true" existent brokers. In short, it should return `inputBrokers.intersect(clusterBrokers)` rather than `inputBrokers`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org