You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/07 22:03:04 UTC

[GitHub] [kafka] rhauch opened a new pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

rhauch opened a new pull request #8828:
URL: https://github.com/apache/kafka/pull/8828


   Supplements #8270 
   
   This change adds a check to the KafkaConfigBackingStore, KafkaOffsetBackingStore, and KafkaStatusBackingStore to use the admin client to verify that the internal topics are compacted and do not use the `delete` cleanup policy.
   
   Connect already will create the internal topics with `cleanup.policy=compact` if the topics do not yet exist when the Connect workers are started; the new topics are created always as compacted, overwriting any user-specified `cleanup.policy`. However, if the topics already exist the worker did not previously verify the internal topics were compacted, such as when a user manually creates the internal topics before starting Connect or manually changes the topic settings after the fact.
   
   The current change helps guard against users running Connect with topics that have delete cleanup policy enabled, which will remove all connector configurations, source offsets, and connector & task statuses that are older than the retention time. This means that, for example, the configuation for a long-running connector could be deleted by the broker, and this will cause restart issues upon a subsequent rebalance or restarting of Connect worker(s).
   
   Connect behavior requires that its internal topics are compacted and not deleted after some retention time. Therefore, this additional check is simply enforcing the existing expectations, and therefore does not need a KIP.
   
   Added unit tests for the new logic, and added an integration test that verifies that the worker will fail to start if each of the three internal topics does not use only `cleanup.policy=compact`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #8828:
URL: https://github.com/apache/kafka/pull/8828


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438412308



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       After some testing, I've discovered that even if the topic-specific settings don't explicitly specify the `cleanup.policy`, the config description for the topic returned by the admin client will still include `cleanup.policy=<broker's log.cleanup.policy>`. That means if we have permissions and the topic exists, we will always get the `cleanup.policy` for the topic.
   
   And, I've cleaned up the semantics of the `topicCleanupPolicy(...)` method to never return a null set. This means that if the set is empty, either the topic doesn't exist or the admin client could not return the topic settings (e.g., old broker, insufficient ACLs, etc.). Either way, this logic gets simplified a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438412308



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       After some testing, I've discovered that even if the topic-specific settings don't explicitly specify the `cleanup.policy`, the config description for the topic returned by the admin client will still include `cleanup.policy=<broker's log.cleanup.policy>`. That means if we have permissions and the topic exists, we will always get the `cleanup.policy` for the topic.
   
   And, I've cleaned up the semantics of the `topicCleanupPolicy(...)` method to never return a null set. This means that if the set is empty, either the topic doesn't exist or the admin client could not return the topic settings (e.g., old broker, insufficient ACLs, etc.). Either way, this logic gets simplified a bit.
   
   I hope that helps address your concern/question.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438303698



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       No kidding... I assumed it was possible to create topics without cleanup policies but it looks like you're right. My bad!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438417337



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       I think my concern was invalid to begin with but your refactoring is certainly an improvement. LGTM

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       I think my concern was invalid to begin with but your refactoring is certainly an improvement. LGTM 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#issuecomment-642272317


   @kkonstantine and @C0urante: thanks for the review. I think I've incorporated all of your feedback and addressed all of your questions. @C0urante, I've even tried to improve the failure message to say what needs to be done if the topic has an unacceptable `cleanup.policy`.
   
   I'd appreciate another pass. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438488444



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability of "
+                    + "%s, but found the topic currently has '%s=%s'. Continuing would likely "
+                    + "result in eventually losing %s and problems restarting this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with '%s=%s'.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, topic);

Review comment:
       same question around log level as above

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "

Review comment:
       Should we consider `info`? It's a one time message right?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        if (!cleanupPolicies.equals(expectedPolicies)) {
+            String expectedPolicyStr = String.join(",", expectedPolicies);
+            String cleanupPolicyStr = String.join(",", cleanupPolicies);
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                    + "to have '%s=%s' to guarantee consistency and durability of "
+                    + "%s, but found the topic currently has '%s=%s'. Continuing would likely "
+                    + "result in eventually losing %s and problems restarting this Connect "
+                    + "cluster in the future. Change the '%s' property in the "
+                    + "Connect worker configurations to use a topic with '%s=%s'.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicyStr, topicPurpose,
+                    workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic does not
+     *         exist or the topic's cleanup policy could not be retrieved
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            // The topic must not exist
+            log.debug("Unable to find topic '{}' when getting cleanup policy", topic);
+            return Collections.emptySet();
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            log.debug("Found cleanup.policy={} for topic '{}'", policyStr, topic);
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .filter(s -> !s.isEmpty())
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        // This is unexpected, as the topic config should include the cleanup.policy even if
+        // the topic settings don't override the broker's log.cleanup.policy. But just to be safe.
+        log.debug("Found no cleanup.policy for topic '{}'", topic);

Review comment:
       same question around log level as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438292803



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -94,6 +95,7 @@
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
     private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    protected final AtomicBoolean running = new AtomicBoolean(false);

Review comment:
       ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r436421647



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",

Review comment:
       nit: should we keep 2 tabs alignment for everything?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -94,6 +95,7 @@
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
     private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    protected final AtomicBoolean running = new AtomicBoolean(false);

Review comment:
       `volatile boolean` is equivalent to `AtomicBoolean` and if we are not going to use any of the compare-and-set/get-and-set capabilities of the atomic class, maybe you'd want to consider using `volatile` instead to avoid the boilerplate of calling get/set on that boolean variable. But of course, the decision is a matter of style too. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -492,7 +493,14 @@ public void putSessionKey(SessionKey sessionKey) {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);

Review comment:
       Vararg gives some unintended consequences in naming. Should we stick to singular given that we expect at most a single topic to be created?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, cleanupPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic exists

Review comment:
       I wonder if distinguishing like that, with `null` and `empty` pays off. 
   Why not return an empty collection in both cases and simplify the checks on the return values of this method?
   
   This method doesn't seem to be the one to use when somebody wants to determine whether a topic exists or not. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, cleanupPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic exists
+     *         but has no cleanup policy, or may be null if the topic does not exist
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            return null;
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        return Collections.emptySet();
+    }
+
+    /**
+     * Attempt to fetch the topic configuration for the given topic.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns a null value.
+     *
+     * <p>If the topic does not exist, a null value is returned.
+     *
+     * @param topic the name of the topic for which the topic configuration should be obtained
+     * @return true if the operation was successful, or false if no topics were described
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Config describeTopicConfig(String topic) {
+        return describeTopicConfigs(topic).get(topic);
+    }
+
+    /**
+     * Attempt to fetch the topic configurations for the given topics.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns an empty set.
+     *
+     * <p>An entry with a null Config is placed into the resulting map for any topic that does
+     * not exist on the brokers.
+     *
+     * @param topicNames the topics to obtain configurations
+     * @return true if the operation was successful, or false if no topics were described
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Map<String, Config> describeTopicConfigs(String...topicNames) {

Review comment:
       nit: don't we need a space between the varargs type and the variable name? I'm surprised mainly at checkstyle here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r436881026



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {

Review comment:
       Is it possible that this will also be true if there isn't a cleanup policy configured on the topic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438521713



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "

Review comment:
       I could see changing this to `info`, because this is important. But the others log messages really are just tracking that we're using the admin client and what we're finding, so I think `debug` is probably the best there. If the cleanup policy is wrong, then if we're logging that we're also going to fail the worker; if the cleanup policy is acceptable, I don't think it's worth logging it at `info`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org