You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/30 17:57:15 UTC

[GitHub] [kafka] mimaison commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

mimaison commented on code in PR #12366:
URL: https://github.com/apache/kafka/pull/12366#discussion_r1036231680


##########
connect/mirror/README.md:
##########
@@ -194,6 +194,10 @@ it is important to keep configuration consistent across flows to the same
 target cluster. In most cases, your entire organization should use a single
 MM2 configuration file.
 
+### Exactly-once
+Exactly-once delivery is not currently supported for dedicated MM2 clusters. In order to run MM2 with exactly-once, deploy it onto a
+Connect cluster that is configured with exactly-once support enabled. 

Review Comment:
   Should we explicitly say `Connect distributed cluster`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -185,11 +193,52 @@ public ConfigDef config() {
         return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
     }
 
+    @Override
+    public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.validate(props);
+        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+            if (!consumerUsesReadCommitted(props)) {
+                ConfigValue exactlyOnceSupport = configValues.stream()
+                        .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+                        .findAny()
+                        .orElseGet(() -> {
+                            ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+                            configValues.add(result);
+                            return result;
+                        });
+                // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED
+                // from our exactlyOnceSupport method, but it will be fairly generic
+                // We add a second error message here to give users more insight into why this specific connector can't support exactly-once
+                // guarantees with the given configuration
+                exactlyOnceSupport.addErrorMessage(
+                        "Mirror Maker 2 can only provide exactly-once guarantees when its source consumer is configured with "

Review Comment:
   I wonder if we should say `MirrorSourceConnector` instead of `Mirror Maker 2`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -69,6 +75,8 @@ public class MirrorSourceConnector extends SourceConnector {
     private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC,
         null, PatternType.ANY);
     private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
+    private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT);
+    private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";

Review Comment:
   It's unfortunate we don't have this constant in the public API



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -185,11 +193,52 @@ public ConfigDef config() {
         return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
     }
 
+    @Override
+    public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.validate(props);
+        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+            if (!consumerUsesReadCommitted(props)) {
+                ConfigValue exactlyOnceSupport = configValues.stream()
+                        .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+                        .findAny()
+                        .orElseGet(() -> {
+                            ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+                            configValues.add(result);
+                            return result;
+                        });
+                // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED
+                // from our exactlyOnceSupport method, but it will be fairly generic
+                // We add a second error message here to give users more insight into why this specific connector can't support exactly-once
+                // guarantees with the given configuration
+                exactlyOnceSupport.addErrorMessage(
+                        "Mirror Maker 2 can only provide exactly-once guarantees when its source consumer is configured with "
+                                + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; "
+                                + "otherwise, records from aborted and uncommitted transactions will be replicated from the "
+                                + "source cluster to the target cluster."
+                );
+            }
+        }
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
     @Override
     public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+        return consumerUsesReadCommitted(props)
+                ? ExactlyOnceSupport.SUPPORTED
+                : ExactlyOnceSupport.UNSUPPORTED;
+    }
+
+    private boolean consumerUsesReadCommitted(Map<String, String> props) {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(props);
+        Object consumerIsolationLevel = config.sourceConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        return Objects.equals(READ_COMMITTED, consumerIsolationLevel);

Review Comment:
   Is `consumerIsolationLevel` guaranteed to be in lower case here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org