You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/08 01:42:07 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

kkonstantine commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r436421647



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",

Review comment:
       nit: should we keep 2 tabs alignment for everything?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -94,6 +95,7 @@
     protected final StatusBackingStore statusBackingStore;
     protected final ConfigBackingStore configBackingStore;
     private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    protected final AtomicBoolean running = new AtomicBoolean(false);

Review comment:
       `volatile boolean` is equivalent to `AtomicBoolean` and if we are not going to use any of the compare-and-set/get-and-set capabilities of the atomic class, maybe you'd want to consider using `volatile` instead to avoid the boilerplate of calling get/set on that boolean variable. But of course, the decision is a matter of style too. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##########
@@ -492,7 +493,14 @@ public void putSessionKey(SessionKey sessionKey) {
             public void run() {
                 log.debug("Creating admin client to manage Connect internal config topic");
                 try (TopicAdmin admin = new TopicAdmin(adminProps)) {
-                    admin.createTopics(topicDescription);
+                    // Create the topic if it doesn't exist
+                    Set<String> newTopics = admin.createTopics(topicDescription);

Review comment:
       Vararg gives some unintended consequences in naming. Should we stick to singular given that we expect at most a single topic to be created?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, cleanupPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic exists

Review comment:
       I wonder if distinguishing like that, with `null` and `empty` pays off. 
   Why not return an empty collection in both cases and simplify the checks on the return values of this method?
   
   This method doesn't seem to be the one to use when somebody wants to determine whether a topic exists or not. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -375,6 +383,152 @@ public boolean createTopic(NewTopic topic) {
         return existingTopics;
     }
 
+    /**
+     * Verify the named topic uses only compaction for the cleanup policy.
+     *
+     * @param topic             the name of the topic
+     * @param workerTopicConfig the name of the worker configuration that specifies the topic name
+     * @return true if the admin client could be used to verify the topic setting, or false if
+     *         the verification could not be performed, likely because the admin client principal
+     *         did not have the required permissions or because the broker was older than 0.11.0.0
+     * @throws ConfigException if the actual topic setting did not match the required setting
+     */
+    public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig,
+            String topicPurpose) {
+        Set<String> cleanupPolicies = topicCleanupPolicy(topic);
+        if (cleanupPolicies == null || cleanupPolicies.isEmpty()) {
+            log.debug("Unable to use admin client to verify the cleanup policy of '{}' "
+                      + "topic is '{}', either because the broker is an older "
+                      + "version or because the Kafka principal used for Connect "
+                      + "internal topics does not have the required permission to "
+                      + "describe topic configurations.", topic, TopicConfig.CLEANUP_POLICY_COMPACT);
+            return false;
+        }
+        String cleanupPolicyStr = String.join(",", cleanupPolicies);
+        log.debug("Found cleanup policy for '{}' topic is '{}'", topic, cleanupPolicyStr);
+        Set<String> expectedPolicies = Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        String expectedPolicyStr = String.join(",", expectedPolicies);
+        if (cleanupPolicies != null && !cleanupPolicies.equals(expectedPolicies)) {
+            String msg = String.format("Topic '%s' supplied via the '%s' property is required "
+                                       + "to have '%s=%s' to guarantee consistency and durability of "
+                                       + "%s, but found '%s'. "
+                                       + "Correct the topic before restarting Connect.",
+                    topic, workerTopicConfig, TopicConfig.CLEANUP_POLICY_CONFIG, expectedPolicyStr,
+                    topicPurpose, cleanupPolicyStr);
+            throw new ConfigException(msg);
+        }
+        return true;
+    }
+
+    /**
+     * Get the cleanup policy for a topic.
+     *
+     * @param topic the name of the topic
+     * @return the set of cleanup policies set for the topic; may be empty if the topic exists
+     *         but has no cleanup policy, or may be null if the topic does not exist
+     */
+    public Set<String> topicCleanupPolicy(String topic) {
+        Config topicConfig = describeTopicConfig(topic);
+        if (topicConfig == null) {
+            return null;
+        }
+        ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
+        if (entry != null && entry.value() != null) {
+            String policyStr = entry.value();
+            return Arrays.stream(policyStr.split(","))
+                         .map(String::trim)
+                         .map(String::toLowerCase)
+                         .collect(Collectors.toSet());
+        }
+        return Collections.emptySet();
+    }
+
+    /**
+     * Attempt to fetch the topic configuration for the given topic.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns a null value.
+     *
+     * <p>If the topic does not exist, a null value is returned.
+     *
+     * @param topic the name of the topic for which the topic configuration should be obtained
+     * @return true if the operation was successful, or false if no topics were described
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Config describeTopicConfig(String topic) {
+        return describeTopicConfigs(topic).get(topic);
+    }
+
+    /**
+     * Attempt to fetch the topic configurations for the given topics.
+     * Apache Kafka added support for describing topic configurations in 0.11.0.0, so this method
+     * works as expected with that and later versions. With brokers older than 0.11.0.0, this method
+     * is unable get the topic configurations and always returns an empty set.
+     *
+     * <p>An entry with a null Config is placed into the resulting map for any topic that does
+     * not exist on the brokers.
+     *
+     * @param topicNames the topics to obtain configurations
+     * @return true if the operation was successful, or false if no topics were described
+     * @throws RetriableException if a retriable error occurs, the operation takes too long, or the
+     *         thread is interrupted while attempting to perform this operation
+     * @throws ConnectException if a non retriable error occurs
+     */
+    public Map<String, Config> describeTopicConfigs(String...topicNames) {

Review comment:
       nit: don't we need a space between the varargs type and the variable name? I'm surprised mainly at checkstyle here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org