You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/24 18:57:39 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494510121



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Couldn't we just use `error(message, e)`?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)
+
+    assertThrows(classOf[ThrottlingQuotaExceededException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
+
+    val expectedNewTopic = new NewTopic(testTopicName, Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)

Review comment:
       We could just use Optional.empty instead of creating in Scala and converting?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)

Review comment:
       May be worth importing `org.mockito.Mockito._` and  `org.mockito.ArgumentMatchers._` to avoid repeating the class name everywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org