You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/22 10:16:14 UTC

[GitHub] [kafka] showuon opened a new pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

showuon opened a new pull request #8712:
URL: https://github.com/apache/kafka/pull/8712


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448099358



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {

Review comment:
       Yes, they are different scenarios. You can check below diagram for reference. (red and green for different cases)
   
   ![image](https://user-images.githubusercontent.com/43372967/86200914-905e3800-bb90-11ea-8d2c-c5e9c7b71cda.png)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436446907



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,49 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String topicLeaderNotAvailable = "LeaderNotAvailableTopic";
+        mockAdminClient.addTopic(
+            false,
+            topicLeaderNotAvailable,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicLeaderNotAvailable, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(topicLeaderNotAvailable, internalTopicConfig);
+
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+            final StreamsException exception = assertThrows(
+                StreamsException.class,
+                () -> internalTopicManager.makeReady(topicConfigMap));
+
+            final String expectedMessage = "Could not create topics after 1 retries. This can happen if the Kafka cluster is temporary not available";

Review comment:
       Good suggestion. Remove the log message check and only verify the exception thrown. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436460691



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,11 +103,11 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         // have existed with the expected number of partitions, or some create topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition assignor.", topics);
 
-        int remainingRetries = retries;
+        remainingRetries = retries;

Review comment:
       OK, fixed. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-653159837


   Tracked in https://issues.apache.org/jira/browse/KAFKA-9831 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445625112



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);

Review comment:
       The `allTopicsToDescribe` is for `tempUnknownTopics` to have chance to get described again, by the retries in `makeReady` method. In the `makeReady`, we want to know which topics existed and to validate it, and which topics not existed that needed to be created. But for the `LeaderNotAvailable`topics, we can't know if topics existed or not. So, we need to merge `topics`(topics to validate) and `tempUnknownTopics` here to describe them all (again) here.
   
   Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429289380



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -247,11 +261,19 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
-            } else {
+            } else if (!needRetryTopics.contains(topicName)) {
                 topicsToCreate.add(topicName);
             }
         }
 
         return topicsToCreate;
     }
+
+    private boolean isNeedRetry(final Set<String> topicsNotReady) {
+        return !topicsNotReady.isEmpty() || hasNeedRetryTopic();

Review comment:
       If there's topic with `LeaderNotAvailableException`, we also need to retry.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
 
                 final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
 
-                for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {
-                    final String topicName = createTopicResult.getKey();
-                    try {
-                        createTopicResult.getValue().get();
-                        topicsNotReady.remove(topicName);
-                    } catch (final InterruptedException fatalException) {
-                        // this should not happen; if it ever happens it indicate a bug
-                        Thread.currentThread().interrupt();
-                        log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                    } catch (final ExecutionException executionException) {
-                        final Throwable cause = executionException.getCause();
-                        if (cause instanceof TopicExistsException) {
-                            // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation.
-                            log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" +
-                                "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" +
-                                "Error message was: {}", topicName, retryBackOffMs, cause.toString());
-                        } else {
-                            log.error("Unexpected error during topic creation for {}.\n" +
-                                "Error message was: {}", topicName, cause.toString());
-                            throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
+                if (createTopicsResult != null) {
+                    for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {

Review comment:
       Need the null check for `createTopicsResult` since the `newTopics` might be empty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-635065576


   Hey @showuon, thanks for the PR! I'm pretty busy at the moment, especially with the 2.6 deadline, but it's on my todo list to review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448103331



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+        final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,
+                Collections.singletonList(broker1), Collections.singletonList(broker1));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>();
+        topicDescriptionSuccessFuture.complete(
+            new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet())
+        );
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))

Review comment:
       good. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-649896512


   hi @abbccdda , Last night before I went to sleep, I kept thinking about your question: 
   > Why do we need allTopicsToDescribe?
   
   Then I found my change has 1 small logic flaw, that if the topic after `LeaderNotAvailableException`, it got `UnknownTopicException`, we won't create topic for it. I'll fix it, and then add 2 more tests for 2 different situation. Will let you know. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448685224



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {

Review comment:
       Actually, I drew it by myself on the MS office powerpoint. haha




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448457381



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,13 +100,19 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         final Set<String> newlyCreatedTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
-            topicsNotReady = validateTopics(topicsNotReady, topics);
+            final Set<String> tempUnknownTopics = new HashSet<>();

Review comment:
       Sure, I don't feel strong about reverting the naming once we already got a reason there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444555724



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Can we give this a more descriptive name? It might be obvious to you, but I think someone just looking at this code for the first time would not get that this actually means topics that may or may not already exist.
   That said, I'm struggling to think of a good alternative...maybe `possiblyCreatedTopics` or `unknownTopics`...any better ideas?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"
+            if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available."));

Review comment:
       Is it possible to use `EasyMock` instead of adding this to the actual `MockAdminClient`? I know it's kind of a pain to set up but I think it'll make the test a lot more clear. I did something similar in StreamsPartitionAssignorTest to mock the results of the `listOffsets` request

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             }
 
 
-            if (!topicsNotReady.isEmpty()) {
-                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, retries);
+            if (isNeedRetry(topicsNotReady)) {
+                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries);

Review comment:
       Good catch

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -242,11 +256,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
-            } else {
+            } else if (!leaderNotAvailableTopics.contains(topicName)) {
                 topicsToCreate.add(topicName);
             }
         }
 
         return topicsToCreate;
     }
+
+    private boolean shouldRetry(final Set<String> topicsNotReady, final HashSet<String> leaderNotAvailableTopics) {
+        // If there's topic with LeaderNotAvailableException, we still need retry
+        return !topicsNotReady.isEmpty() || leaderNotAvailableTopics.size() > 0;

Review comment:
       Can we just use `!isEmpty` for both sets?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-649896512


   hi @abbccdda , Last night before I went to sleep, I kept thinking about your question: 
   > Why do we need allTopicsToDescribe?
   Then I found my change has 1 small logic flaw, that if the topic after `LeaderNotAvailableException`, it got `UnknownTopicException`, we won't create topic for it. I'll fix it, and then add 2 more tests for 2 different situation. Will let you know. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-648846927


   Hi @ableegoldman , thanks for the good suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/8712/commits/f378c34d10c57d796412ac946f867e5787f93af0. What I did in this commit is:
   
   1. rewrite the test by `EasyMock`, to make the tests better, and because of this test rewrite, I found a bug that I missed before, which is that when we retried the `describeTopics` in `getNumPartitions`, we should also add the `tempUnknownTopics` into the to-be-describe list since they also needed to be retried. Thanks for the suggestion! 
   2. remove unneeded hack in MockAdminClient
   3. rename the variable name `leaderNotAvailableTopics` to `tempUnknownTopics`
   4. use `isEmpty()` to be consistent
   
   Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-651505551


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436479515



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Oh, Nice catch! You're correct, @abbccdda , it might have a non-empty leaderNotAvailableTopics after each call to `makeReady`. I changed it to using local variable. Thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-653159433






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-650135514


   hi @abbccdda , I've fixed the issue. What I did in the final commit are:
      0. Revert the unnecessary changes 
       1. Return the `topicsNotReady` to `makeReady` including `tempUnknownTopics`, and not create topic to wait for next retry
       2. Empty `tempUnknownTopics` will be created in each retry, since the `topicsNotReady` now includes `tempUnknownTopics` (can check below diagram)
       3. add 2 more tests to total test 3 cases:
         3.1 shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound
         3.2 shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess
         3.3 shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable
   
   Below is the sequence diagram for `makeReady`. Hope it's clear for what I did. Thanks.
   <img width="1361" alt="image" src="https://user-images.githubusercontent.com/43372967/85853165-a06ac600-b7e4-11ea-888e-ac98dc26cca0.png">
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-648962849


   retest this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448100994



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);

Review comment:
       good. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444741037



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -242,11 +256,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     log.error(errorMsg);
                     throw new StreamsException(errorMsg);
                 }
-            } else {
+            } else if (!leaderNotAvailableTopics.contains(topicName)) {
                 topicsToCreate.add(topicName);
             }
         }
 
         return topicsToCreate;
     }
+
+    private boolean shouldRetry(final Set<String> topicsNotReady, final HashSet<String> leaderNotAvailableTopics) {
+        // If there's topic with LeaderNotAvailableException, we still need retry
+        return !topicsNotReady.isEmpty() || leaderNotAvailableTopics.size() > 0;

Review comment:
       Good suggestion. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652713031


   @abbccdda , thanks for pointing out the duplicate line. I didn't notice it, My bad! I've fixed. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-635071774


   Thanks, @ableegoldman . Take your time. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448095124



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,13 +100,19 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         final Set<String> newlyCreatedTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty() && remainingRetries >= 0) {
-            topicsNotReady = validateTopics(topicsNotReady, topics);
+            final Set<String> tempUnknownTopics = new HashSet<>();

Review comment:
       Well, this variable naming is actually suggested by @ableegoldman , and I also think the `tempUnknownTopics` is more descriptive. Is that OK for you? 
   
   https://github.com/apache/kafka/pull/8712#discussion_r444555724
   
   ![image](https://user-images.githubusercontent.com/43372967/86199833-b6cea400-bb8d-11ea-81b5-7ba5b6064709.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809


   Thanks for the comments, @abbccdda . I've address your comments in this commit: https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b
   
   What I did in this commit are:
   1. simplifiy code
   2. use the existing topic name in tests, instead of creating a new topic name: `leaderNotAvailableTopic`
   3. use `numRetries` to indicate we are trying to go beyond the retry limit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-650135514


   hi @abbccdda , I've fixed the issue. What I did in the final commit are:
      0. Revert the unnecessary changes 
       1. Return the `topicsNotReady` to `makeReady` including `tempUnknownTopics`, and not create topic to wait for next retry
       2. Empty `tempUnknownTopics` will be created in each retry, since the `topicsNotReady` now includes `tempUnknownTopics` (can check below diagram)
       3. add 2 more tests to total test 3 cases:
         3.1 shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound
         3.2 shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess
         3.3 shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable
   
   Below is the sequence diagram for `makeReady`. Hope it's clear for what I did. Thanks.
   <img width="1371" alt="image" src="https://user-images.githubusercontent.com/43372967/85853736-a4e3ae80-b7e5-11ea-84b5-47984ee170bb.png">
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445601207



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -108,7 +111,8 @@ public void shouldReturnCorrectPartitionCounts() {
             topic,
             Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),
             null);
-        assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+        assertEquals(Collections.singletonMap(topic, 1),
+                internalTopicManager.getNumPartitions(Collections.singleton(topic), new HashSet<String>(), 1));

Review comment:
       Updated. Thanks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -218,13 +231,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
     /**
      * Check the existing topics to have correct number of partitions; and return the remaining topics that needs to be created
      */
-    private Set<String> validateTopics(final Set<String> topicsToValidate, final Map<String, InternalTopicConfig> topicsMap) {
+    private Set<String> validateTopics(final Set<String> topicsToValidate,
+                                       final Map<String, InternalTopicConfig> topicsMap,
+                                       final HashSet<String> tempUnknownTopics,

Review comment:
       Fixed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);
+        log.debug("Trying to check if topics {} have been created with expected number of partitions.", allTopicsToDescribe);
+
+        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(allTopicsToDescribe);
         final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values();
 
         final Map<String, Integer> existedTopicPartition = new HashMap<>();
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, topicDescription.partitions().size());
+                tempUnknownTopics.remove(topicName);
             } catch (final InterruptedException fatalException) {
                 // this should not happen; if it ever happens it indicate a bug
                 Thread.currentThread().interrupt();
                 log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
                 throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
             } catch (final ExecutionException couldNotDescribeTopicException) {
                 final Throwable cause = couldNotDescribeTopicException.getCause();
-                if (cause instanceof UnknownTopicOrPartitionException ||
-                    cause instanceof LeaderNotAvailableException) {
-                    // This topic didn't exist or leader is not known yet, proceed to try to create it
-                    log.debug("Topic {} is unknown or not found, hence not existed yet: {}", topicName, cause.toString());
+                if (cause instanceof UnknownTopicOrPartitionException) {
+                    // This topic didn't exist, proceed to try to create it
+                    log.debug("Topic {} is unknown or not found, hence not existed yet.\n" +
+                        "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof LeaderNotAvailableException) {
+                    tempUnknownTopics.add(topicName);
+                    if (remainingRetries > 0) {

Review comment:
       Updated.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {

Review comment:
       Updated.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +291,41 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+
+        // simulate describeTopics got LeaderNotAvailableException
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+                .andReturn(new MockDescribeTopicsResult(

Review comment:
       Nice catch! Fixed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -243,10 +259,18 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToCreate.add(topicName);
+                // for the tempUnknownTopics, we'll check again later if retries > 0

Review comment:
       Fixed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> tempUnknownTopics = new HashSet<>();

Review comment:
       Fixed. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-648846927


   Hi @ableegoldman , thanks for the good suggestion! I've updated in this commit: https://github.com/apache/kafka/pull/8712/commits/f378c34d10c57d796412ac946f867e5787f93af0. What I did in this commit is:
   
   1. rewrite the test by `EasyMock`, to make the tests better, and because of this test rewrite, I found a bug that I missed before, which is that when we retried the `describeTopics`, we should also add the `tempUnknownTopics` into the list since they also needed to be retried. Thanks for the suggestion! 
   2. remove unneeded hack in MockAdminClient
   3. rename the variable name `leaderNotAvailableTopics` to `tempUnknownTopics`
   4. use `isEmpty()` to be consistent
   
   Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-649896512


   hi @abbccdda , Last night before I went to sleep, I kept thinking about your question: 
   > Why do we need allTopicsToDescribe?
   
   Then I found my change has 1 small logic flaw, that is if the topic after `LeaderNotAvailableException` thrown, it got `UnknownTopicException`, we won't create topic for it. I'll fix it, and then add 2 more tests for 2 different situation. Will let you know. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448460197



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {

Review comment:
       Thanks for explaining! May I know what this software you uses to generate the graph?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"
+            if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available."));

Review comment:
       Try to simulate the `LeaderNotAvailableException` in the `MockAdminClient`, if the topic name is`LeaderNotAvailableTopic`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436444027



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -195,20 +198,30 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, topicDescription.partitions().size());
+                if (leaderNotAvailableTopics.contains(topicName)) {

Review comment:
       Good point! The `remove` method will return true/false for key found/not found. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"
+            if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available."));

Review comment:
       Try to simulate the `LeaderNotAvailableException` in the `MockAdminClient`, by setting the topic name to `LeaderNotAvailableTopic`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652778801


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-653216172


   Still only flaky tests are failing, merging.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-647885294


   hi @abbccdda @ableegoldman , sorry, but this PR stays for a while, do you have other comments? Or should I find someone else to review? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-632729141


   Hi @ableegoldman , could you review this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #8712:
URL: https://github.com/apache/kafka/pull/8712


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r446126644



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -125,35 +131,37 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
 
                 final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
 
-                for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {
-                    final String topicName = createTopicResult.getKey();
-                    try {
-                        createTopicResult.getValue().get();
-                        topicsNotReady.remove(topicName);
-                    } catch (final InterruptedException fatalException) {
-                        // this should not happen; if it ever happens it indicate a bug
-                        Thread.currentThread().interrupt();
-                        log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
-                    } catch (final ExecutionException executionException) {
-                        final Throwable cause = executionException.getCause();
-                        if (cause instanceof TopicExistsException) {
-                            // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation.
-                            log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" +
-                                "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" +
-                                "Error message was: {}", topicName, retryBackOffMs, cause.toString());
-                        } else {
-                            log.error("Unexpected error during topic creation for {}.\n" +
-                                "Error message was: {}", topicName, cause.toString());
-                            throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
+                if (createTopicsResult != null) {
+                    for (final Map.Entry<String, KafkaFuture<Void>> createTopicResult : createTopicsResult.values().entrySet()) {

Review comment:
       Need the null check for `createTopicsResult` since the `newTopics` might be empty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448099598



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+        final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,
+                Collections.singletonList(broker1), Collections.singletonList(broker1));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>();
+        topicDescriptionSuccessFuture.complete(
+            new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet())
+        );
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionSuccessFuture)))
+            .once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
+        final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+
+        // simulate describeTopics got LeaderNotAvailableException
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture)))
+            .times(2);
+
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+
+        final StreamsException exception = assertThrows(

Review comment:
       Nice catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-650135514


   hi @abbccdda , I've fixed the issue. What I did in the final commit are:
      0. Revert the unnecessary changes 
       1. Return the `topicsNotReady` to `makeReady` including `tempUnknownTopics`, and not create topic to wait for next retry
       2. Empty `tempUnknownTopics` will be created in each retry, since the `topicsNotReady` now includes `tempUnknownTopics` (can check below diagram)
       3. add 2 more tests to total test 3 cases:
         3.1 shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound
         3.2 shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess
         3.3 shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable
   
   Below is the sequence diagram for `makeReady`. Hope it's clear for what I did. Thanks.
   <img width="1367" alt="image" src="https://user-images.githubusercontent.com/43372967/85853920-ee33fe00-b7e5-11ea-8710-6bdc21bf44e7.png">
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-650135514






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-649604155


   hi @abbccdda , thanks for the comments. I've updated them in this commit: https://github.com/apache/kafka/pull/8712/commits/70d41212fe8a4d5055993c56e30c2342498e1664. Please help review again. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429276664



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             }
 
 
-            if (!topicsNotReady.isEmpty()) {
-                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, retries);
+            if (isNeedRetry(topicsNotReady)) {
+                log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries);

Review comment:
       the log should put `remainingRetries` since it's logging `with {} retries left`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-640396593


   Thanks @abbccdda , I've address your review comments in this commit: https://github.com/apache/kafka/pull/8712/commits/0567ecbc82c01b0c6e596fe5c28918f3b530a263. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-652177809


   Thanks for the comments, @abbccdda . I've address your comments in this commit: https://github.com/apache/kafka/pull/8712/commits/4b57a606a3834323302b6d3d33ab95e5b88d183b
   
   What I did in this commit are:
   1. simplifiy code
   2. use the existing topic name in tests, instead of creating a new topic name: `leaderNotAvailableTopic`
   3. use numRetries to indicate we are trying to go beyond the retry limit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-650135514


   hi @abbccdda , I've fixed the issue. What I did in the final commit are:
       1. Return the `topicsNotReady` to `makeReady` including `tempUnknownTopics`, and not create topic to wait for next retry
       2. Empty `tempUnknownTopics` will be created in each retry, since the `topicsNotReady` now includes `tempUnknownTopics` (can check below diagram)
       3. add 2 more tests to total test 3 cases:
         3.1 shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound
         3.2 shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess
         3.3 shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable
   
   Below is the sequence diagram for `makeReady`. Hope it's clear for what I did. Thanks.
   <img width="1361" alt="image" src="https://user-images.githubusercontent.com/43372967/85853165-a06ac600-b7e4-11ea-888e-ac98dc26cca0.png">
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r444739758



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Good suggestion, I changed to `tempUnknownTopics`, because they are temporarily unknown topics for now. How do you think? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436446552



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"

Review comment:
       Good suggestion. I put it as a constant. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-635002983


   Hi @abbccdda @ableegoldman , please help review this PR. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436283473



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,11 +103,11 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         // have existed with the expected number of partitions, or some create topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition assignor.", topics);
 
-        int remainingRetries = retries;
+        remainingRetries = retries;

Review comment:
       This is a personal preference, but I think we should not attempt to include a temporal variable as part of the class struct. We could change the internal function signatures to pass around remainingRetries (like `validateTopics`) instead.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -195,20 +198,30 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, topicDescription.partitions().size());
+                if (leaderNotAvailableTopics.contains(topicName)) {

Review comment:
       This contains check is unnecessary.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Similar to this struct, it doesn't make sense to have a non-empty leaderNotAvailableTopics after each call to `makeReady`, I would prefer building it as local variable, cc @ableegoldman 

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"

Review comment:
       This workaround is very hard to be found by other developers, as a minimum we should define a constant and make it part of `MockAdminClient` class

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       This should be declared final.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,49 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String topicLeaderNotAvailable = "LeaderNotAvailableTopic";
+        mockAdminClient.addTopic(
+            false,
+            topicLeaderNotAvailable,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicLeaderNotAvailable, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(topicLeaderNotAvailable, internalTopicConfig);
+
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+            final StreamsException exception = assertThrows(
+                StreamsException.class,
+                () -> internalTopicManager.makeReady(topicConfigMap));
+
+            final String expectedMessage = "Could not create topics after 1 retries. This can happen if the Kafka cluster is temporary not available";

Review comment:
       Testing against log message is error-prone and hard to maintain, I think just making sure the thrown exception type is expected should be sufficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r448100930



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,124 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionLeaderNotAvailableFuture = new KafkaFutureImpl<>();
+        topicDescriptionLeaderNotAvailableFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionUnknownTopicFuture = new KafkaFutureImpl<>();
+        topicDescriptionUnknownTopicFuture.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
+        final KafkaFutureImpl<CreateTopicsResult.TopicMetadataAndConfig> topicCreationFuture = new KafkaFutureImpl<>();
+        topicCreationFuture.complete(EasyMock.createNiceMock(CreateTopicsResult.TopicMetadataAndConfig.class));
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionLeaderNotAvailableFuture)))
+            .once();
+        // return empty set for 1st time
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionUnknownTopicFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.singleton(
+                new NewTopic(leaderNotAvailableTopic, Optional.of(1), Optional.of((short) 1))
+            .configs(Utils.mkMap(Utils.mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE),
+                Utils.mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
+                Utils.mkEntry(TopicConfig.SEGMENT_BYTES_CONFIG, "52428800"),
+                Utils.mkEntry(TopicConfig.RETENTION_MS_CONFIG, "-1"))))))
+            .andReturn(new MockCreateTopicsResult(Collections.singletonMap(leaderNotAvailableTopic, topicCreationFuture))).once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+        final TopicPartitionInfo partitionInfo = new TopicPartitionInfo(0, broker1,
+                Collections.singletonList(broker1), Collections.singletonList(broker1));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+        final KafkaFutureImpl<TopicDescription> topicDescriptionSuccessFuture = new KafkaFutureImpl<>();
+        topicDescriptionSuccessFuture.complete(
+            new TopicDescription(topic, false, Collections.singletonList(partitionInfo), Collections.emptySet())
+        );
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture)))
+            .once();
+        EasyMock.expect(admin.createTopics(Collections.emptySet()))
+            .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once();
+
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionSuccessFuture)))
+            .once();
+
+        EasyMock.replay(admin);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(leaderNotAvailableTopic, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(leaderNotAvailableTopic, internalTopicConfig);
+        topicManager.makeReady(topicConfigMap);
+
+        EasyMock.verify(admin);
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
+        final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+
+        // simulate describeTopics got LeaderNotAvailableException
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+            .andReturn(new MockDescribeTopicsResult(
+                Collections.singletonMap(leaderNotAvailableTopic, topicDescriptionFailFuture)))
+            .times(2);

Review comment:
       good suggestion!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r445277549



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {

Review comment:
       We could just pass in a boolean here to indicate whether there are remaining retries

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -98,9 +98,10 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         int remainingRetries = retries;
         Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
+        final HashSet<String> tempUnknownTopics = new HashSet<>();

Review comment:
       s/HashSet/Set?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -243,10 +259,18 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
                     throw new StreamsException(errorMsg);
                 }
             } else {
-                topicsToCreate.add(topicName);
+                // for the tempUnknownTopics, we'll check again later if retries > 0

Review comment:
       Could be merged with above `else`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);

Review comment:
       Why do we need `allTopicsToDescribe`? It seems only queried once locally.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +291,41 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String leaderNotAvailableTopic = "LeaderNotAvailableTopic";
+        final AdminClient admin = EasyMock.createNiceMock(AdminClient.class);
+        final InternalTopicManager topicManager = new InternalTopicManager(admin, new StreamsConfig(config));
+
+        final KafkaFutureImpl<TopicDescription> topicDescriptionFailFuture = new KafkaFutureImpl<>();
+        topicDescriptionFailFuture.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
+
+        // simulate describeTopics got LeaderNotAvailableException
+        EasyMock.expect(admin.describeTopics(Collections.singleton(leaderNotAvailableTopic)))
+                .andReturn(new MockDescribeTopicsResult(

Review comment:
       Use 4 space format to align with other tests.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -179,31 +180,43 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
-        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topics);
+    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
+                                                    final HashSet<String> tempUnknownTopics,
+                                                    final int remainingRetries) {
+        final Set<String> allTopicsToDescribe = new HashSet<>(topics);
+        allTopicsToDescribe.addAll(tempUnknownTopics);
+        log.debug("Trying to check if topics {} have been created with expected number of partitions.", allTopicsToDescribe);
+
+        final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(allTopicsToDescribe);
         final Map<String, KafkaFuture<TopicDescription>> futures = describeTopicsResult.values();
 
         final Map<String, Integer> existedTopicPartition = new HashMap<>();
         for (final Map.Entry<String, KafkaFuture<TopicDescription>> topicFuture : futures.entrySet()) {
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, topicDescription.partitions().size());
+                tempUnknownTopics.remove(topicName);
             } catch (final InterruptedException fatalException) {
                 // this should not happen; if it ever happens it indicate a bug
                 Thread.currentThread().interrupt();
                 log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
                 throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
             } catch (final ExecutionException couldNotDescribeTopicException) {
                 final Throwable cause = couldNotDescribeTopicException.getCause();
-                if (cause instanceof UnknownTopicOrPartitionException ||
-                    cause instanceof LeaderNotAvailableException) {
-                    // This topic didn't exist or leader is not known yet, proceed to try to create it
-                    log.debug("Topic {} is unknown or not found, hence not existed yet: {}", topicName, cause.toString());
+                if (cause instanceof UnknownTopicOrPartitionException) {
+                    // This topic didn't exist, proceed to try to create it
+                    log.debug("Topic {} is unknown or not found, hence not existed yet.\n" +
+                        "Error message was: {}", topicName, cause.toString());
+                } else if (cause instanceof LeaderNotAvailableException) {
+                    tempUnknownTopics.add(topicName);
+                    if (remainingRetries > 0) {

Review comment:
       Could reduce the if-else block as:
   ```
   if (remainingRetries <= 0) {
     // run out of retries, throw exception directly 
     throw new StreamsException(
       String.format("The leader of the Topic %s is not available after %d retries.", topicName, retries), cause);
    }
    log.debug("The leader of the Topic {} is not available, with {} retries left.\n" +
      "Error message was: {}", topicName, remainingRetries, cause.toString());
   
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -218,13 +231,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
     /**
      * Check the existing topics to have correct number of partitions; and return the remaining topics that needs to be created
      */
-    private Set<String> validateTopics(final Set<String> topicsToValidate, final Map<String, InternalTopicConfig> topicsMap) {
+    private Set<String> validateTopics(final Set<String> topicsToValidate,
+                                       final Map<String, InternalTopicConfig> topicsMap,
+                                       final HashSet<String> tempUnknownTopics,

Review comment:
       Similar here, we could reduce to Set

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -108,7 +111,8 @@ public void shouldReturnCorrectPartitionCounts() {
             topic,
             Collections.singletonList(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList())),
             null);
-        assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
+        assertEquals(Collections.singletonMap(topic, 1),
+                internalTopicManager.getNumPartitions(Collections.singleton(topic), new HashSet<String>(), 1));

Review comment:
       Could use Collections.emptySet() if reduced to Set




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org