You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/04 20:52:20 UTC

[GitHub] [kafka] cadonna opened a new pull request #10266: KAFKA-10357: Add validation method for internal topics

cadonna opened a new pull request #10266:
URL: https://github.com/apache/kafka/pull/10266


   For KIP-698, we need a way to validate internal topics before
   we create them. This PR adds a validation method to the
   InternalTopicManager for that purpose.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10266:
URL: https://github.com/apache/kafka/pull/10266


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591240160



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());

Review comment:
       Makes sense! Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#issuecomment-790933530


   Call for review: @mjsax @rodesai @ableegoldman @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#issuecomment-795163588


   @rodesai I included your feedback. Could you have another look, please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591259233



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {

Review comment:
       Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591241911



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",

Review comment:
       Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591236863



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -451,6 +465,550 @@ public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
         );
     }
 
+    @Test
+    public void shouldValidateSuccessfully() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));
+
+        assertThat(validationResult.missingTopics, empty());
+        assertThat(validationResult.misconfigurationsForTopics, anEmptyMap());
+    }
+
+    @Test
+    public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());

Review comment:
       Good catch! Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rodesai commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
rodesai commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r588053733



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);

Review comment:
       why are we using RETRY_BACKOFF_MS_CONFIG here? this is just a sleep to avoid a tight loop in checking whether the futures are done right? In that case we wouldn't actually be issuing a new request, so we can probably just use a constant set to some small value (e.g. a few hundred ms).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());

Review comment:
       why not just log this stuff at info level? we're not on a performance path (this gets executed once every rebalance right?), and it can always come in handy when you're debugging. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);

Review comment:
       should we move this into the above loop so that we timeout if the futures never complete?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -451,6 +465,550 @@ public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
         );
     }
 
+    @Test
+    public void shouldValidateSuccessfully() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());
+        internalTopicConfig.setNumberOfPartitions(1);
+
+        final ValidationResult validationResult = internalTopicManager.validate(Collections.singletonMap(topic1, internalTopicConfig));
+
+        assertThat(validationResult.missingTopics, empty());
+        assertThat(validationResult.misconfigurationsForTopics, anEmptyMap());
+    }
+
+    @Test
+    public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
+        mockAdminClient.addTopic(
+            false,
+            topic1,
+            Collections.singletonList(new TopicPartitionInfo(0, broker1, cluster, Collections.emptyList())),
+            null
+        );
+        final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic1, Collections.emptyMap());

Review comment:
       this doesn't seem to be used

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
+                                                              final InternalTopicConfig topicConfig,
+                                                              final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
+                                                            final InternalTopicConfig topicConfig,
+                                                            final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =
+                topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
+            final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG));
+            if (brokerSideRetentionMs < streamsSideRetentionMs) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention time of existing internal topic " + topicName + " is " + brokerSideRetentionMs +
+                    " but should be " + streamsSideRetentionMs + " or larger."
+                );
+            }
+            final String brokerSideRetentionBytes =
+                getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_BYTES_CONFIG, topicName);
+            if (brokerSideRetentionBytes != null) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention byte of existing internal topic " + topicName + " is set but it should be unset."
+                );
+            }
+        }
+    }
+
+    private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
+                                            final String configName,
+                                            final String topicName) {
+        final ConfigEntry brokerSideConfigEntry = brokerSideTopicConfig.get(configName);
+        if (brokerSideConfigEntry == null) {
+            throw new IllegalStateException("The config " + configName + " for topic " +

Review comment:
       should this include the guidance from BUG_ERROR_MESSAGE?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",

Review comment:
       ditto about the log level (also for the below uses of `debug`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(

Review comment:
       use `addMisconfiguration`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
+                                                              final InternalTopicConfig topicConfig,
+                                                              final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
+                                                            final InternalTopicConfig topicConfig,
+                                                            final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =

Review comment:
       wouldn't we want to check the configs the user provided here rather than the defaults?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {

Review comment:
       add a javadoc explaining what this method is doing and what the return value is?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r590561174



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(

Review comment:
       I moved `addMisconfiguration()` into `ValidationResults` and replaced the code you commented on. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#issuecomment-796690925


   Call for committer review: @mjsax @ableegoldman @guozhangwang


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r590519538



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);

Review comment:
       Good point! Will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r590511478



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);

Review comment:
       That is a good point. I set it to 100 ms and moved the sleep from the `doValidateTopic()` method to the end of the inner while loop since it does not make sense to wait between checking the futures for `describeTopics()` and the futures of `describeConfigs()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591162851



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
+                                                              final InternalTopicConfig topicConfig,
+                                                              final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
+                                                            final InternalTopicConfig topicConfig,
+                                                            final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =

Review comment:
       We check the configs the users provided here. Method `getProperties()` takes the topic configs from the Streams configs and overrides it with the topic configs the user provided in the code. Admittedly, it is not that clear from the code and it can be improved, but I would avoid changing it in this PR. See the unit test `shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedChangelogTopics()` and below for a confirmation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591162851



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
+                                                              final InternalTopicConfig topicConfig,
+                                                              final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
+                                                            final InternalTopicConfig topicConfig,
+                                                            final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =

Review comment:
       We check the configs the users provided here. Method `getProperties()` takes the topic configs from the Streams configs and overrides it with the topic configs the user provided in the code. Admittedly, it is not that clear from the code and it can be improved, but I would avoid changing it in this PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r587817305



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java
##########
@@ -36,7 +36,7 @@
 
     private final Map<ConfigResource, KafkaFuture<Config>> futures;
 
-    DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {
+    protected DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) {

Review comment:
       Needed for a mock in the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10266: KAFKA-10357: Add validation method for internal topics

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10266:
URL: https://github.com/apache/kafka/pull/10266#discussion_r591234992



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,6 +104,225 @@ public InternalTopicManager(final Time time,
         }
     }
 
+    static class ValidationResult {
+        public Set<String> missingTopics = new HashSet<>();
+        public Map<String, List<String>> misconfigurationsForTopics = new HashMap<>();
+    }
+
+    public ValidationResult validate(final Map<String, InternalTopicConfig> topicConfigs) {
+        log.debug("Starting to validate internal topics {}.", topicConfigs.keySet());
+
+        final long now = time.milliseconds();
+        final long deadline = now + retryTimeoutMs;
+
+        final ValidationResult validationResult = new ValidationResult();
+        final Set<String> topicDescriptionsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        final Set<String> topicConfigsStillToValidate = new HashSet<>(topicConfigs.keySet());
+        while (!topicDescriptionsStillToValidate.isEmpty() || !topicConfigsStillToValidate.isEmpty()) {
+            Map<String, KafkaFuture<TopicDescription>> descriptionsForTopic = Collections.emptyMap();
+            if (!topicDescriptionsStillToValidate.isEmpty()) {
+                final DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(topicDescriptionsStillToValidate);
+                descriptionsForTopic = describeTopicsResult.values();
+            }
+            Map<String, KafkaFuture<Config>> configsForTopic = Collections.emptyMap();
+            if (!topicConfigsStillToValidate.isEmpty()) {
+                final DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
+                    topicConfigsStillToValidate.stream()
+                        .map(topic -> new ConfigResource(Type.TOPIC, topic))
+                        .collect(Collectors.toSet())
+                );
+                configsForTopic = describeConfigsResult.values().entrySet().stream()
+                    .collect(Collectors.toMap(entry -> entry.getKey().name(), Map.Entry::getValue));
+            }
+
+            while (!descriptionsForTopic.isEmpty() || !configsForTopic.isEmpty()) {
+                if (!descriptionsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        descriptionsForTopic,
+                        topicConfigs,
+                        topicDescriptionsStillToValidate,
+                        (brokerSide, streamsSide) -> validatePartitionCount(validationResult, brokerSide, streamsSide)
+                    );
+                }
+                if (!configsForTopic.isEmpty()) {
+                    doValidateTopic(
+                        validationResult,
+                        configsForTopic,
+                        topicConfigs,
+                        topicConfigsStillToValidate,
+                        (brokerSide, streamsSide) -> validateCleanupPolicy(validationResult, brokerSide, streamsSide)
+                    );
+                }
+            }
+
+            maybeThrowTimeoutException(topicDescriptionsStillToValidate, deadline);
+        }
+
+        log.debug("Completed validation of internal topics {}.", topicConfigs.keySet());
+        return validationResult;
+    }
+
+    private <V> void doValidateTopic(final ValidationResult validationResult,
+                                     final Map<String, KafkaFuture<V>> futuresForTopic,
+                                     final Map<String, InternalTopicConfig> topicsConfigs,
+                                     final Set<String> topicsStillToValidate,
+                                     final BiConsumer<InternalTopicConfig, V> validator) {
+        for (final InternalTopicConfig topicConfig : topicsConfigs.values()) {
+            final String topicName = topicConfig.name();
+            if (!futuresForTopic.containsKey(topicName)) {
+                throw new IllegalStateException("Described configs do not contain topics to validate. " + BUG_ERROR_MESSAGE);
+            }
+            final KafkaFuture<V> future = futuresForTopic.get(topicName);
+            if (future.isDone()) {
+                try {
+                    final V brokerSideTopicConfig = future.get();
+                    validator.accept(topicConfig, brokerSideTopicConfig);
+                    topicsStillToValidate.remove(topicName);
+                } catch (final ExecutionException executionException) {
+                    final Throwable cause = executionException.getCause();
+                    if (cause instanceof UnknownTopicOrPartitionException) {
+                        log.debug("Internal topic {} is missing",
+                            topicName);
+                        validationResult.missingTopics.add(topicName);
+                        topicsStillToValidate.remove(topicName);
+                    } else if (cause instanceof LeaderNotAvailableException) {
+                        log.debug("The leader of internal topic {} is not available.", topicName);
+                    } else if (cause instanceof TimeoutException) {
+                        log.debug("Retrieving data for internal topic {} timed out.", topicName);
+                    } else {
+                        log.error("Unexpected error during internal topic validation: ", cause);
+                        throw new StreamsException(
+                            String.format("Could not validate internal topic %s for the following reason: ", topicName),
+                            cause
+                        );
+                    }
+                } catch (final InterruptedException interruptedException) {
+                    throw new InterruptException(interruptedException);
+                } finally {
+                    futuresForTopic.remove(topicName);
+                }
+            }
+        }
+
+        if (!futuresForTopic.isEmpty()) {
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void maybeThrowTimeoutException(final Set<String> topicsStillToValidate,
+                                            final long deadline) {
+        if (!topicsStillToValidate.isEmpty()) {
+            final long now = time.milliseconds();
+            if (now >= deadline) {
+                final String timeoutError = String.format("Could not validate internal topics within %d milliseconds. " +
+                    "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs);
+                log.error(timeoutError);
+                throw new TimeoutException(timeoutError);
+            }
+            log.info(
+                "Internal topics {} could not be validated. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
+                topicsStillToValidate,
+                retryBackOffMs,
+                deadline - now
+            );
+            Utils.sleep(retryBackOffMs);
+        }
+    }
+
+    private void validatePartitionCount(final ValidationResult validationResult,
+                                        final InternalTopicConfig topicConfig,
+                                        final TopicDescription topicDescription) {
+        final String topicName = topicConfig.name();
+        final int requiredPartitionCount = topicConfig.numberOfPartitions()
+            .orElseThrow(() -> new IllegalStateException("No partition count is specified for internal topic " +
+                topicName + ". " + BUG_ERROR_MESSAGE));
+        final int actualPartitionCount = topicDescription.partitions().size();
+        if (actualPartitionCount != requiredPartitionCount) {
+            validationResult.misconfigurationsForTopics.computeIfAbsent(
+                topicConfig.name(),
+                ignored -> new ArrayList<>()
+            ).add("Internal topic " + topicName + " requires " + requiredPartitionCount + " partitions, " +
+                "but the existing topic on the broker has " + actualPartitionCount + " partitions.");
+        }
+    }
+
+    private void validateCleanupPolicy(final ValidationResult validationResult,
+                                       final InternalTopicConfig topicConfig,
+                                       final Config brokerSideTopicConfig) {
+        if (topicConfig instanceof UnwindowedChangelogTopicConfig) {
+            validateCleanupPolicyForUnwindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        } else if (topicConfig instanceof WindowedChangelogTopicConfig) {
+            validateCleanupPolicyForWindowedChangelogs(validationResult, topicConfig, brokerSideTopicConfig);
+        }
+    }
+
+    private void validateCleanupPolicyForUnwindowedChangelogs(final ValidationResult validationResult,
+                                                              final InternalTopicConfig topicConfig,
+                                                              final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = brokerSideTopicConfig.get(TopicConfig.CLEANUP_POLICY_CONFIG).value();
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            addMisconfiguration(
+                validationResult,
+                topicName,
+                "Cleanup policy of existing internal topic " + topicName + " should not contain \""
+                + TopicConfig.CLEANUP_POLICY_DELETE + "\"."
+            );
+        }
+    }
+
+    private void validateCleanupPolicyForWindowedChangelogs(final ValidationResult validationResult,
+                                                            final InternalTopicConfig topicConfig,
+                                                            final Config brokerSideTopicConfig) {
+        final String topicName = topicConfig.name();
+        final String cleanupPolicy = getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, topicName);
+        if (cleanupPolicy.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
+            final long brokerSideRetentionMs =
+                Long.parseLong(getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_MS_CONFIG, topicName));
+            final Map<String, String> streamsSideConfig =
+                topicConfig.getProperties(defaultTopicConfigs, windowChangeLogAdditionalRetention);
+            final long streamsSideRetentionMs = Long.parseLong(streamsSideConfig.get(TopicConfig.RETENTION_MS_CONFIG));
+            if (brokerSideRetentionMs < streamsSideRetentionMs) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention time of existing internal topic " + topicName + " is " + brokerSideRetentionMs +
+                    " but should be " + streamsSideRetentionMs + " or larger."
+                );
+            }
+            final String brokerSideRetentionBytes =
+                getBrokerSideConfigValue(brokerSideTopicConfig, TopicConfig.RETENTION_BYTES_CONFIG, topicName);
+            if (brokerSideRetentionBytes != null) {
+                addMisconfiguration(
+                    validationResult,
+                    topicName,
+                    "Retention byte of existing internal topic " + topicName + " is set but it should be unset."
+                );
+            }
+        }
+    }
+
+    private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
+                                            final String configName,
+                                            final String topicName) {
+        final ConfigEntry brokerSideConfigEntry = brokerSideTopicConfig.get(configName);
+        if (brokerSideConfigEntry == null) {
+            throw new IllegalStateException("The config " + configName + " for topic " +

Review comment:
       Yes, will do!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org