You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/06 17:03:14 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

abbccdda commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r436283473



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -100,11 +103,11 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
         // have existed with the expected number of partitions, or some create topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition assignor.", topics);
 
-        int remainingRetries = retries;
+        remainingRetries = retries;

Review comment:
       This is a personal preference, but I think we should not attempt to include a temporal variable as part of the class struct. We could change the internal function signatures to pass around remainingRetries (like `validateTopics`) instead.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -195,20 +198,30 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams
             final String topicName = topicFuture.getKey();
             try {
                 final TopicDescription topicDescription = topicFuture.getValue().get();
-                existedTopicPartition.put(
-                    topicFuture.getKey(),
-                    topicDescription.partitions().size());
+                existedTopicPartition.put(topicName, topicDescription.partitions().size());
+                if (leaderNotAvailableTopics.contains(topicName)) {

Review comment:
       This contains check is unnecessary.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       Similar to this struct, it doesn't make sense to have a non-empty leaderNotAvailableTopics after each call to `makeReady`, I would prefer building it as local variable, cc @ableegoldman 

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##########
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
                 future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
                 topicDescriptions.put(requestedTopic, future);
             }
+            // try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic"

Review comment:
       This workaround is very hard to be found by other developers, as a minimum we should define a constant and make it part of `MockAdminClient` class

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -59,6 +59,9 @@ private InternalAdminClientConfig(final Map<?, ?> props) {
 
     private final int retries;
     private final long retryBackOffMs;
+    private int remainingRetries;
+
+    private HashSet<String> leaderNotAvailableTopics = new HashSet<>();

Review comment:
       This should be declared final.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -287,12 +290,49 @@ public void shouldLogWhenTopicNotFoundAndNotThrowException() {
 
             assertThat(
                 appender.getMessages(),
-                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet:" +
-                    " org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
+                hasItem("stream-thread [" + threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\n" +
+                    "Error message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found.")
             );
         }
     }
 
+    @Test
+    public void shouldLogWhenTopicLeaderNotAvailableAndThrowException() {
+        final String topicLeaderNotAvailable = "LeaderNotAvailableTopic";
+        mockAdminClient.addTopic(
+            false,
+            topicLeaderNotAvailable,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null);
+
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topicLeaderNotAvailable, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final Map<String, InternalTopicConfig> topicConfigMap = new HashMap<>();
+        topicConfigMap.put(topicLeaderNotAvailable, internalTopicConfig);
+
+        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+            final StreamsException exception = assertThrows(
+                StreamsException.class,
+                () -> internalTopicManager.makeReady(topicConfigMap));
+
+            final String expectedMessage = "Could not create topics after 1 retries. This can happen if the Kafka cluster is temporary not available";

Review comment:
       Testing against log message is error-prone and hard to maintain, I think just making sure the thrown exception type is expected should be sufficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org