You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/23 23:14:00 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

ableegoldman commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444555724



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Can we give this a more descriptive name? It might be obvious to you, but I think someone just looking at this code for the first time would not get that this actually means topics that may or may not already exist.
   That said, I'm struggling to think of a good alternative...maybe `possiblyCreatedTopics` or `unknownTopics`...any better ideas?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"
+            if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available."));

Review comment:
       Is it possible to use `EasyMock` instead of adding this to the actual `MockAdminClient`? I know it's kind of a pain to set up but I think it'll make the test a lot more clear. I did something similar in StreamsPartitionAssignorTest to mock the results of the `listOffsets` request

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             }
 
 
-            if (!topicsNotReady.isEmpty()) {
-                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, retries);
+            if (isNeedRetry(topicsNotReady)) {
+                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries);

Review comment:
       Good catch

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -242,11 +256,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
-            } else {
+            } else if (!leaderNotAvailableTopics.contains(topicName)) {
                 topicsToCreate.add(topicName);
             }
         }
 
         return topicsToCreate;
     }
+
+    private boolean shouldRetry(final Set<String> topicsNotReady, final HashSet<String> leaderNotAvailableTopics) {
+        // If there's topic with LeaderNotAvailableException, we still need retry
+        return !topicsNotReady.isEmpty() || leaderNotAvailableTopics.size() > 0;

Review comment:
       Can we just use `!isEmpty` for both sets?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org