You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/03/16 17:03:27 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1139041573


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java:
##########
@@ -73,6 +73,18 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
     public static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS = SYNC_TOPIC_CONFIGS + INTERVAL_SECONDS_SUFFIX;
     private static final String SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DOC = "Frequency of topic config sync.";
     public static final long SYNC_TOPIC_CONFIGS_INTERVAL_SECONDS_DEFAULT = 10 * 60;
+    @Deprecated
+    public static final String USE_INCREMENTAL_ALTER_CONFIG = "use.incremental.alter.configs";
+    private static final String USE_INCREMENTAL_ALTER_CONFIG_DOC = "Deprecated. Which API to use for syncing topic configs. " +
+            "The valid values are \"requested\", \"required\" and \"never\". " +
+            "By default, set to \"requested\", which means the IncrementalAlterConfigs is being used for syncing topic configurations " +
+            "and if any request receives an error from an incompatible broker, it will fallback to using the deprecated AlterConfigs API. " +
+            "If explicitly set to \"required\", the IncrementalAlterConfigs is used without the fallback logic. " +

Review Comment:
   This differs from what the KIP described, which was:
   
   > When explicitly set to "required", MirrorMaker will use the IncrementalAlterConfigs API for syncing topic configurations. If it receives an error from an incompatible broker, the MirrorMaker will report this to the user and fail the connector.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)

Review Comment:
   We also need to fail the connector if `use.incremental.alter.configs` is set to `required` and the error is an `UnsupportedVersionException`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +540,37 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && !shouldReplicateSourceDefault(config.source())) {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
+            if (e != null) {
+                if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.USE_INCREMENTAL_ALTER_CONFIG_DEFAULT)
+                        && e instanceof UnsupportedVersionException) {
+                    //Fallback logic
+                    log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e);
+                    alterConfigs(topicConfigs);

Review Comment:
   We probably don't want to directly invoke `alterConfigs` here, since on the `scheduler` thread a call to that method may already be happening.
   
   Also, we shouldn't be syncing `topicConfigs` since those may have grown stale in between when we computed them and when the config alter request completed.
   
   I personally don't think it's necessary to invoke `alterConfigs` when we notice the failure at all, and we can instead just wait for the next scheduled topic config sync. However, if that's not good enough for some reason, we should probably use the `scheduler` for the immediate follow-up call to `alterConfigs`; e.g., something like `scheduler.execute(this::syncTopicConfigs, "syncing topic configs (failover)"`



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -500,9 +517,18 @@ static Map<String, String> configToMap(Config config) {
                 .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
     }
 
-    @SuppressWarnings("deprecation")
-    // use deprecated alterConfigs API for broker compatibility back to 0.11.0
     private void updateTopicConfigs(Map<String, Config> topicConfigs) {
+        if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)) {
+            alterConfigs(topicConfigs);
+        } else {
+            incrementalAlterConfigs(topicConfigs);
+        }
+    }
+
+    // visible for testing
+    // use deprecated alterConfigs API for broker compatibility back to 0.11.0
+    @SuppressWarnings("deprecation")
+    void alterConfigs(Map<String, Config> topicConfigs) {

Review Comment:
   Do you think it might help to rename this to `deprecatedAlterConfigs`, to help contrast with `incrementalAlterConfigs` and make things clearer for people who have to read this code base for the first time in the future?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -96,6 +100,7 @@ public class MirrorSourceConnector extends SourceConnector {
     private Admin sourceAdminClient;
     private Admin targetAdminClient;
     private Admin offsetSyncsAdminClient;
+    private String useIncrementalAlterConfigs;

Review Comment:
   Since the value of this field can be altered on a different thread from the one that checks it, I think we want to add the `volatile` keyword here.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -539,11 +596,20 @@ Map<String, Config> describeTopicConfigs(Set<String> topics)
     }
 
     Config targetConfig(Config sourceConfig) {
-        List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-            .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-            .collect(Collectors.toList());
+        List<ConfigEntry> entries;
+        if (useIncrementalAlterConfigs == MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) {

Review Comment:
   Do we need to do the `isDefault` check at all? I was under the impression that `ConfigPropertyFilter:: shouldReplicateSourceDefault` would be used to control replication of defaults regardless of whether we use the deprecated or incremental alter configs API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org