You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/14 21:17:12 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10532: KAFKA-8531: Change default replication factor config

ableegoldman commented on a change in pull request #10532:
URL: https://github.com/apache/kafka/pull/10532#discussion_r613592763



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -439,13 +440,34 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
                         final Throwable cause = executionException.getCause();
                         if (cause instanceof TopicExistsException) {
                             // This topic didn't exist earlier or its leader not known before; just retain it for next round of validation.
-                            log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n" +
-                                "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n" +
-                                "Error message was: {}", topicName, retryBackOffMs, cause.toString());
+                            log.info(
+                                "Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\n"
+                                    +
+                                    "Will retry to create this topic in {} ms (to let broker finish async delete operation first).\n"
+                                    +
+                                    "Error message was: {}", topicName, retryBackOffMs,
+                                cause.toString());
                         } else {
                             log.error("Unexpected error during topic creation for {}.\n" +
                                 "Error message was: {}", topicName, cause.toString());
-                            throw new StreamsException(String.format("Could not create topic %s.", topicName), cause);
+
+                            if (cause instanceof UnsupportedVersionException) {
+                                final String errorMessage = cause.getMessage();
+                                if (errorMessage != null &&
+                                    errorMessage.startsWith("Creating topics with default partitions/replication factor are only supported in CreateTopicRequest version 4+")) {
+
+                                    throw new StreamsException(String.format(

Review comment:
       nit: can we also log an error before throwing, possibly with the actual `cause` message

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import java.util.Collection;

Review comment:
       nit: import in the wrong place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org