You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/24 08:48:14 UTC

[GitHub] [kafka] dajac opened a new pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

dajac opened a new pull request #9334:
URL: https://github.com/apache/kafka/pull/9334


   This PR does two things:
   * As stated in KIP-599, we'd like to disable the automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in order to avoid letting the command hangs until `default.api.timeout.ms` is reached.
   * Change the default value of `ErrorMessage` to `null` in the `DeleteTopicsResponse`. I have noted that it defaults to an empty string at the moment. The Admin client only uses the default message when the `ErrorMessage` is `null` so we end up with an empty message provided back to the user when `ErrorMessage` is not explicitly set by the broker.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494856451



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       That's a very good question. I guess that we did so in order to just print out the stacktrace without the message. Otherwise, we would have the error message printed out by the `println`, followed by the same message printed out by logger, followed by the stacktrace. Having the message twice is not necessary. I had a quick look at other commands and we do so everywhere.
   
   I will open a JIRA to make tools more consistent. Good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#issuecomment-698784923


   @rajinisivaram Thanks for your comments. I have updated the PR and answered your questions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494510121



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Couldn't we just use `error(message, e)`?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)
+
+    assertThrows(classOf[ThrottlingQuotaExceededException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
+
+    val expectedNewTopic = new NewTopic(testTopicName, Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)

Review comment:
       We could just use Optional.empty instead of creating in Scala and converting?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)

Review comment:
       May be worth importing `org.mockito.Mockito._` and  `org.mockito.ArgumentMatchers._` to avoid repeating the class name everywhere.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#issuecomment-698784923


   @rajinisivaram Thanks for your comments. I have updated the PR and answered your questions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494510121



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Couldn't we just use `error(message, e)`?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)
+
+    assertThrows(classOf[ThrottlingQuotaExceededException],
+      () => topicService.createTopic(new TopicCommandOptions(Array("--topic", testTopicName))))
+
+    val expectedNewTopic = new NewTopic(testTopicName, Option.empty[Integer].asJava, Option.empty[java.lang.Short].asJava)

Review comment:
       We could just use Optional.empty instead of creating in Scala and converting?

##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)

Review comment:
       May be worth importing `org.mockito.Mockito._` and  `org.mockito.ArgumentMatchers._` to avoid repeating the class name everywhere.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Sorry, I should have been more clear. I just meant, why do we call `Utils.stackTrace(e))` when `error(message, e)` gives you a stack trace for free. In terms of changing from `error` to `debug`, I think we include stack track in other commands. But each command seems to use something different. ConfigCommand, for example, doesn't print stacktrace for config exceptions. It logs both the error message and stacktrace to stderr. AclCommand prints for everything to stdout. Let's just keep `error` for now and maybe open a JIRA to improve and make tools consistent later?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494815168



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Do you mean using `error(message, e)` to replace both `println` and `error`? I think that we are using 'println` here in order to print the message to stdout without any logger related stuff and regardless of how the logger is configured. Changing to using `error(message, e)` would break this and potentially break existing application due to introducing the logger related stuff for that message. I think that we should keep `println` here.
   
   However, I wonder if using `error` is appropriate here as it basically prints a stacktrace for every errors. The UX does not look good. Would it make sense to use `debug` instead? The message of the exception is printed anyway and I don't think that the stacktrace provides much to regular users. WDYT?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #9334:
URL: https://github.com/apache/kafka/pull/9334


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494810769



##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)

Review comment:
       That makes sense. I had to introduce an alias for `ArgumentMatchers.eq` to not conflict with `eq`. I went with `eqThat` to remain inline with `argThat`.

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Do you mean using `error(message, e)` to replace both `println` and `error`? I think that we are using 'println` here in order to print the message to stdout without any logger related stuff and regardless of how the logger is configured. Changing to using `error(message, e)` would break this and potentially break existing application due to introducing the logger related stuff for that message. I think that we should keep `println` here.
   
   However, I wonder if using `error` is appropriate here as it basically prints a stacktrace for every errors. The UX does not look good. Would it make sense to use `debug` instead? The message of the exception is printed anyway and I don't think that the stacktrace provides much to regular users. WDYT?
   
   

##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       That's a very good question. I guess that we did so in order to just print out the stacktrace without the message. Otherwise, we would have the error message printed out by the `println`, followed by the same message printed out by logger, followed by the stacktrace. Having the message twice is not necessary. I had a quick look at other commands and we do so everywhere.
   
   I will open a JIRA to make tools more consistent. Good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494810769



##########
File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##########
@@ -844,4 +851,72 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     assertEquals(2, rows.size)
     rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1")
   }
+
+  @Test
+  def testCreateTopicDoesNotRetryThrottlingQuotaExceededException(): Unit = {
+    val adminClient = Mockito.mock(classOf[Admin])
+    val topicService = AdminClientTopicService(adminClient)
+
+    val result = AdminClientTestUtils.createTopicsResult(testTopicName, Errors.THROTTLING_QUOTA_EXCEEDED.exception())
+    Mockito.when(adminClient.createTopics(ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(result)

Review comment:
       That makes sense. I had to introduce an alias for `ArgumentMatchers.eq` to not conflict with `eq`. I went with `eqThat` to remain inline with `argThat`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9334: KAFKA-10516; Disable automatic retry of `THROTTLING_QUOTA_EXCEEDED` errors in the `kafka-topics` command (KIP-599)

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9334:
URL: https://github.com/apache/kafka/pull/9334#discussion_r494837556



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -69,16 +71,26 @@ object TopicCommand extends Logging {
       else if (opts.hasDeleteOption)
         topicService.deleteTopic(opts)
     } catch {
+      case e: ExecutionException =>
+        if (e.getCause != null)
+          printException(e.getCause)
+        else
+          printException(e)
+        exitCode = 1
       case e: Throwable =>
-        println("Error while executing topic command : " + e.getMessage)
-        error(Utils.stackTrace(e))
+        printException(e)
         exitCode = 1
     } finally {
       topicService.close()
       Exit.exit(exitCode)
     }
   }
 
+  private def printException(e: Throwable): Unit = {
+    println("Error while executing topic command : " + e.getMessage)
+    error(Utils.stackTrace(e))

Review comment:
       Sorry, I should have been more clear. I just meant, why do we call `Utils.stackTrace(e))` when `error(message, e)` gives you a stack trace for free. In terms of changing from `error` to `debug`, I think we include stack track in other commands. But each command seems to use something different. ConfigCommand, for example, doesn't print stacktrace for config exceptions. It logs both the error message and stacktrace to stderr. AclCommand prints for everything to stdout. Let's just keep `error` for now and maybe open a JIRA to improve and make tools consistent later?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org