You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 09:48:38 UTC

[GitHub] [kafka] dajac opened a new pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

dajac opened a new pull request #10141:
URL: https://github.com/apache/kafka/pull/10141


   `kafka-reassign-partitions` command gives a generic error message when one tries to reassign a topic which does not exist:
   
   ```
   $ ./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file reassignment.json
   Error: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
   ```
   
   When the reassignment contains multiple topics, it is hard to find out the correct one. This PR improves this to give the name of the topic in the error:
   
   ```
   $ ./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file reassignment.json
   Error: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic test-test not found.
   ```
   
   I propose to update the `KafkaAdminClient#describeTopics` method to handle this case directly. I noted that the unit tests of the `ReassignPartitionsCommand` already expects to find the topic name in the error. This works in unit test because the `MockAdminClient` does that. When the regular admin client is used however, the generic message is used except when the topic is not in the response received from the broker at all. This PR makes the handling and the error consistent in all cases in the `KafkaAdminClient#describeTopics`.
   
   Alternatively, we could handle this entirely in the `ReassignPartitionsCommand`. We could catch the exception and raise a new one with the message that we want. I don't feel strong for either ways so I went with doing it in the admin client.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r589425927



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       Oh, one more thing, did you check that the null check is actually needed? I think isInstanceOf is defined for null too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r585901281



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       Also, you can have the `if` in the `case t` line and then a second `case` for the rethrow case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r584616679



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException if t.getCause != null =>

Review comment:
       How about using `classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)`? That includes null check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r589424932



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       Hmm, I think this is an example of code that is less readable. If `cause` may be nullable, it's better to write code that makes that clear rather than a non obvious alternative that could have been used for many other reasons (using `Option` to handle nullables is fine as a counter example since it's common usage to do that to handle nulls).
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#issuecomment-787783411


   > @dajac Nice improvement. Some minor comments are left. Please take a look.
   > 
   > 1. https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L1893 also check the existence of topic. Will it become a redundant check after we merge this PR?
   > 2. Should we apply this readable error message to other APIs? `deleteTopics`, for example.
   
   @chia7712 Thanks for your comment. To answer your questions:
   1. No, it is not. That one throws an exception if the topic was not returned at all by the broker.
   2. That's a good question... I do agree that it is a bit weird to do this only in one case. Thinking about this, I think that it might be better to keep the error generic in the admin client and to handle it on the calling side instead. I will try this so we can compare.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r585900837



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       Hmm. why not write this as `t.getCause.isInstanceOf[UnknownTopicOrPartitionException]`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#issuecomment-787870948


   @chia7712 Thanks. I just pushed a commit to address your comment. Could you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #10141:
URL: https://github.com/apache/kafka/pull/10141


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r586194412



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       I initially used `t.getCause != null && t.getCause.isInstanceOf[UnknownTopicOrPartitionException]`. @chia7712 suggested to use `classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)` to avoid having to do the null check as `isInstance` does it. That seemed reasonable to me so I went with it. Is there a reason not to use it?
   
   I do agree with your second comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#issuecomment-787968243


   Failed test is not related. Merging to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r589468725



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       @ijuma Yeah, I do agree with you. For the null check, I was not aware that isInstanceOf handles it. That's good to know, thanks.
   
   I will open a small PR to fix this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r589448109



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       > I think isInstanceOf is defined for null too.
   
   you are right. Scala does define it. https://scala-lang.org/files/archive/spec/2.13/spec.pdf
   
   ‘’’
   
   isInstanceOf[T] always returns false.
   ‘’’
   
   Good to know that :)
   
   +1 to use ‘t.getCause.isInstanceOf[UnknownTopicOrPartitionException]‘ as it can deal with null check.
   
   Sorry for my imprecise comment :(
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#issuecomment-787818715


   @chia7712 I just pushed an update. Please take a look and let me know what you think about it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r577570241



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1887,7 +1887,11 @@ void handleResponse(AbstractResponse abstractResponse) {
                     KafkaFutureImpl<TopicDescription> future = entry.getValue();
                     Errors topicError = errors.get(topicName);
                     if (topicError != null) {
-                        future.completeExceptionally(topicError.exception());
+                        if (topicError == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+                            future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found."));

Review comment:
       How about using `topicError.exception("Topic " + topicName + " not found.")`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10141: KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10141:
URL: https://github.com/apache/kafka/pull/10141#discussion_r589424932



##########
File path: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
##########
@@ -812,6 +812,26 @@ object ReassignPartitionsCommand extends Logging {
     proposedAssignments
   }
 
+  private def describeTopics(adminClient: Admin,
+                             topics: Set[String])
+                             : Map[String, TopicDescription] = {
+    adminClient.describeTopics(topics.asJava).values.asScala.map {
+      case (topicName, topicDescriptionFuture) =>
+        try {
+          topicName -> topicDescriptionFuture.get
+        }
+        catch {
+          case t: ExecutionException =>
+            if (classOf[UnknownTopicOrPartitionException].isInstance(t.getCause)) {

Review comment:
       Hmm, I think this is an example of code that is less readable. If `cause` may be nullable, it's better to write code that makes that clear rather than a non obvious alternative that could have been used for many other reasons (using `Option` to handle nullables is fine as a counter example since it's common usage to do that).
   
   What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org