You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/01 17:14:26 UTC

[GitHub] [kafka] mimaison commented on a diff in pull request #12366: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector

mimaison commented on code in PR #12366:
URL: https://github.com/apache/kafka/pull/12366#discussion_r1037373964


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -185,11 +193,52 @@ public ConfigDef config() {
         return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
     }
 
+    @Override
+    public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.validate(props);
+        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+            if (!consumerUsesReadCommitted(props)) {
+                ConfigValue exactlyOnceSupport = configValues.stream()
+                        .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+                        .findAny()
+                        .orElseGet(() -> {
+                            ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+                            configValues.add(result);
+                            return result;
+                        });
+                // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED
+                // from our exactlyOnceSupport method, but it will be fairly generic
+                // We add a second error message here to give users more insight into why this specific connector can't support exactly-once
+                // guarantees with the given configuration
+                exactlyOnceSupport.addErrorMessage(
+                        "Mirror Maker 2 can only provide exactly-once guarantees when its source consumer is configured with "
+                                + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; "
+                                + "otherwise, records from aborted and uncommitted transactions will be replicated from the "
+                                + "source cluster to the target cluster."
+                );
+            }
+        }
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
     @Override
     public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+        return consumerUsesReadCommitted(props)
+                ? ExactlyOnceSupport.SUPPORTED
+                : ExactlyOnceSupport.UNSUPPORTED;
+    }
+
+    private boolean consumerUsesReadCommitted(Map<String, String> props) {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(props);
+        Object consumerIsolationLevel = config.sourceConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        return Objects.equals(READ_COMMITTED, consumerIsolationLevel);

Review Comment:
   Ah yes you're right! Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org