You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/17 07:57:51 UTC

[GitHub] [kafka] rodesai commented on a change in pull request #10317: KAFKA-10357: Add setup method to internal topics

rodesai commented on a change in pull request #10317:
URL: https://github.com/apache/kafka/pull/10317#discussion_r595778710



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
 
         return topicsToCreate;
     }
+
+    /**
+     * Sets up internal topics.
+     *
+     * Either the given topic are all created or the method fails with an exception.
+     *
+     * @param topicConfigs internal topics to setup
+     */
+    public void setup(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.info("Starting to setup internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final Map<String, Map<String, String>> newTopicConfigs = topicConfigs.values().stream()
+            .collect(Collectors.toMap(
+                InternalTopicConfig::name,
+                topicConfig -> topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention)
+            ));
+        final Set<String> topicStillToCreate = new HashSet<>(topicConfigs.keySet());
+        while (!topicStillToCreate.isEmpty()) {
+            final Set<NewTopic> newTopics = topicStillToCreate.stream()
+                .map(topicName -> new NewTopic(
+                        topicName,
+                        topicConfigs.get(topicName).numberOfPartitions(),
+                        Optional.of(replicationFactor)
+                    ).configs(newTopicConfigs.get(topicName))
+                ).collect(Collectors.toSet());
+
+            log.info("Going to create internal topics: " + newTopics);
+            final CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopics);
+
+            final Map<String, KafkaFuture<Void>> createResultForTopic = createTopicsResult.values();
+            while (!createResultForTopic.isEmpty()) {
+                for (final InternalTopicConfig topicConfig : topicConfigs.values()) {
+                    final String topicName = topicConfig.name();
+                    if (!createResultForTopic.containsKey(topicName)) {
+                        throw new IllegalStateException("Create topic results do not contain internal topic " + topicName
+                            + " to setup. " + BUG_ERROR_MESSAGE);
+                    }
+                    final KafkaFuture<Void> createResult = createResultForTopic.get(topicName);
+                    if (createResult.isDone()) {
+                        try {
+                            createResult.get();
+                            topicStillToCreate.remove(topicName);
+                        } catch (final ExecutionException executionException) {
+                            final Throwable cause = executionException.getCause();
+                            if (cause instanceof TopicExistsException) {
+                                log.info("Internal topic {} already exists. Topic is probably marked for deletion. " +
+                                    "Will retry to create this topic later (to let broker complete async delete operation first)",
+                                    topicName);
+                            } else if (cause instanceof TimeoutException) {
+                                log.info("Creating internal topic {} timed out.", topicName);
+                            } else {
+                                log.error("Unexpected error during creation of internal topic: ", cause);
+                                throw new StreamsException(
+                                    String.format("Could not create internal topic %s for the following reason: ", topicName),
+                                    cause
+                                );
+                            }
+                        } catch (final InterruptedException interruptedException) {
+                            throw new InterruptException(interruptedException);
+                        } finally {
+                            createResultForTopic.remove(topicName);
+                        }
+                    }
+                }
+
+                maybeThrowTimeoutException(
+                    Collections.singletonList(topicStillToCreate),
+                    deadline,
+                    String.format("Could not create internal topics within %d milliseconds. This can happen if the " +

Review comment:
       In this case I think we should include some error details here. In particular, the last seen error for each topic. I'm worried about cases where we try to create but the create times out but is eventually successful. We'd return an error back, but the user would have no way to know that setup failed because an internal topic already exists.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
 
         return topicsToCreate;
     }
+
+    /**
+     * Sets up internal topics.
+     *
+     * Either the given topic are all created or the method fails with an exception.
+     *
+     * @param topicConfigs internal topics to setup
+     */
+    public void setup(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.info("Starting to setup internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final Map<String, Map<String, String>> newTopicConfigs = topicConfigs.values().stream()

Review comment:
       naming nit: topicConfigsWithRetention

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -580,4 +562,118 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
 
         return topicsToCreate;
     }
+
+    /**
+     * Sets up internal topics.
+     *
+     * Either the given topic are all created or the method fails with an exception.
+     *
+     * @param topicConfigs internal topics to setup
+     */
+    public void setup(final Map<String, InternalTopicConfig> topicConfigs) {

Review comment:
       should we make an effort to clean up created topics on failure? currently this method is not retriable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org