You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/13 15:09:40 UTC

[kafka] branch trunk updated: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8cfafba2794 KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)
8cfafba2794 is described below

commit 8cfafba2794562840b0f1c537e304f084b9359cf
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Feb 13 10:09:14 2023 -0500

    KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../apache/kafka/common/config/AbstractConfig.java |   9 +-
 .../java/org/apache/kafka/common/utils/Utils.java  |  38 +++++++
 .../org/apache/kafka/common/utils/UtilsTest.java   |  18 +++
 connect/mirror/README.md                           |  42 +++++++
 .../connect/mirror/MirrorConnectorConfig.java      |  20 ++--
 .../kafka/connect/mirror/MirrorMakerConfig.java    |   8 +-
 .../connect/mirror/MirrorSourceConnector.java      |  49 ++++++++
 .../connect/mirror/MirrorSourceConnectorTest.java  | 124 +++++++++++++++++++++
 .../DedicatedMirrorIntegrationTest.java            |   3 +-
 .../IdentityReplicationIntegrationTest.java        |   4 +-
 .../MirrorConnectorsIntegrationBaseTest.java       |  12 +-
 ...MirrorConnectorsIntegrationExactlyOnceTest.java |  52 +++++++++
 .../runtime/distributed/DistributedConfig.java     |   2 +-
 13 files changed, 351 insertions(+), 30 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..e620f18f7d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -274,14 +274,7 @@ public class AbstractConfig {
      */
     public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
         Map<String, Object> result = new RecordingMap<>(prefix, false);
-        for (Map.Entry<String, ?> entry : originals.entrySet()) {
-            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
-                if (strip)
-                    result.put(entry.getKey().substring(prefix.length()), entry.getValue());
-                else
-                    result.put(entry.getKey(), entry.getValue());
-            }
-        }
+        result.putAll(Utils.entriesWithPrefix(originals, prefix, strip));
         return result;
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4ac8d7d2fe1..0c3d6f15636 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1520,4 +1520,42 @@ public final class Utils {
             throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str);
         return str.substring(0, str.length() - oldSuffix.length()) + newSuffix;
     }
+
+    /**
+     * Find all key/value pairs whose keys begin with the given prefix, and remove that prefix from all
+     * resulting keys.
+     * @param map the map to filter key/value pairs from
+     * @param prefix the prefix to search keys for
+     * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map}
+     * parameter whose key begins with the given {@code prefix} and whose corresponding keys have
+     * the prefix stripped from them; may be empty, but never null
+     * @param <V> the type of values stored in the map
+     */
+    public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix) {
+        return entriesWithPrefix(map, prefix, true);
+    }
+
+    /**
+     * Find all key/value pairs whose keys begin with the given prefix, optionally removing that prefix
+     * from all resulting keys.
+     * @param map the map to filter key/value pairs from
+     * @param prefix the prefix to search keys for
+     * @param strip whether the keys of the returned map should not include the prefix
+     * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map}
+     * parameter whose key begins with the given {@code prefix}; may be empty, but never null
+     * @param <V> the type of values stored in the map
+     */
+    public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix, boolean strip) {
+        Map<String, V> result = new HashMap<>();
+        for (Map.Entry<String, V> entry : map.entrySet()) {
+            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+                if (strip)
+                    result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+                else
+                    result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2366da33662..bed5ed62dd6 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -44,6 +44,7 @@ import java.time.temporal.ChronoField;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
@@ -936,4 +937,21 @@ public class UtilsTest {
         assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"));
     }
 
+    @Test
+    public void testEntriesWithPrefix() {
+        Map<String, Object> props = new HashMap<>();
+        props.put("foo.bar", "abc");
+        props.put("setting", "def");
+
+        // With stripping
+        Map<String, Object> expected = Collections.singletonMap("bar", "abc");
+        Map<String, Object> actual = Utils.entriesWithPrefix(props, "foo.");
+        assertEquals(expected, actual);
+
+        // Without stripping
+        expected = Collections.singletonMap("foo.bar", "abc");
+        actual = Utils.entriesWithPrefix(props, "foo.", false);
+        assertEquals(expected, actual);
+    }
+
 }
diff --git a/connect/mirror/README.md b/connect/mirror/README.md
index 3c8aebc635f..db016fd3742 100644
--- a/connect/mirror/README.md
+++ b/connect/mirror/README.md
@@ -194,6 +194,48 @@ it is important to keep configuration consistent across flows to the same
 target cluster. In most cases, your entire organization should use a single
 MM2 configuration file.
 
+### Exactly-once
+Exactly-once semantics are supported for dedicated MM2 clusters as of version 3.5.0.
+
+For new MM2 clusters, set the `exactly.once.source.support` property to `enabled` for
+all targeted Kafka clusters that should be written to with exactly-once semantics. For example,
+to enable exactly-once for writes to cluster `B`, add the following to your MM2 config file:
+
+    B.exactly.once.source.support = enabled
+
+For existing MM2 clusters, a two-step upgrade is necessary. Instead of immediately
+setting the `exactly.once.source.support` property to `enabled`, first set it to `preparing` on
+all nodes in the cluster. Once this is complete, it can be set to `enabled` on all nodes in the
+cluster, in a second round of restarts.
+
+In either case, it is also necessary to enable intra-cluster communication between your MM2
+nodes, as described in
+[KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters).
+To do this, the `dedicated.mode.enable.internal.rest` property must be set to `true` in your MM2 config.
+In addition, many of the REST-related
+[configuration properties available for Kafka Connect](https://kafka.apache.org/documentation/#connectconfigs)
+can be specified in your MM2 config. For example, to enable intra-cluster communication in your MM2
+cluster with each node listening on port 8080 of their local machine, add this to your config file:
+
+    dedicated.mode.enable.internal.rest = true
+    listeners = http://localhost:8080
+
+**Note that, if intra-cluster communication is enabled in production environments, it is highly
+recommended to secure the REST servers brought up by each MM2 node. See the configuration
+properties for Kafka Connect for information on how this can be accomplished.**
+
+It is also recommended to filter records from aborted transactions out from replicated data
+when running MM2. To do this, ensure that the consumer used to read from source clusters is
+configured with `isolation.level` set to `read_committed`. If replicating data from cluster `A`,
+this can  be done for all replication flows that read from that cluster by adding the following
+to your MM2 config:
+
+    A.consumer.isolation.level = read_committed
+
+As a final note, under the hood, MM2 uses Kafka Connect source connectors to replicate data.
+For more information on exactly-once support for these kinds of connectors, see the
+[relevant docs page](https://kafka.apache.org/documentation/#connect_exactlyoncesource).
+
 ## Remote topics
 
 MM2 employs a naming convention to ensure that records from different
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index e65134d7416..6896cf74157 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -146,14 +146,18 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
     }
 
     Map<String, Object> sourceConsumerConfig() {
-        Map<String, Object> props = new HashMap<>();
-        props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
-        props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
-        props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
-        props.putAll(originalsWithPrefix(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX));
-        props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
-        return props;
+        return sourceConsumerConfig(originals());
+    }
+
+    static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) {
+        Map<String, Object> result = new HashMap<>();
+        result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX));
+        result.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+        result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
+        result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX));
+        result.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+        result.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return result;
     }
 
     Map<String, Object> targetAdminConfig() {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 85f4e9d79e4..3acc6bf7624 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -300,15 +300,11 @@ public class MirrorMakerConfig extends AbstractConfig {
     }
  
     private Map<String, String> stringsWithPrefixStripped(String prefix) {
-        return originalsStrings().entrySet().stream()
-            .filter(x -> x.getKey().startsWith(prefix))
-            .collect(Collectors.toMap(x -> x.getKey().substring(prefix.length()), Entry::getValue));
+        return Utils.entriesWithPrefix(originalsStrings(), prefix);
     }
 
     private Map<String, String> stringsWithPrefix(String prefix) {
-        Map<String, String> strings = originalsStrings();
-        strings.keySet().removeIf(x -> !x.startsWith(prefix));
-        return strings;
+        return Utils.entriesWithPrefix(originalsStrings(), prefix, false);
     }
 
     static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b49daf0ff76..0ecc69c0b37 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -16,10 +16,15 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import java.util.Locale;
 import java.util.Map.Entry;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigResource;
@@ -47,6 +52,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import java.util.Map;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Objects;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Collection;
@@ -69,6 +75,8 @@ public class MirrorSourceConnector extends SourceConnector {
     private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC,
         null, PatternType.ANY);
     private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
+    private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT);
+    private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
 
     private Scheduler scheduler;
     private MirrorSourceConfig config;
@@ -188,11 +196,52 @@ public class MirrorSourceConnector extends SourceConnector {
         return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
     }
 
+    @Override
+    public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
+        List<ConfigValue> configValues = super.validate(props).configValues();
+        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+            if (!consumerUsesReadCommitted(props)) {
+                ConfigValue exactlyOnceSupport = configValues.stream()
+                        .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+                        .findAny()
+                        .orElseGet(() -> {
+                            ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+                            configValues.add(result);
+                            return result;
+                        });
+                // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED
+                // from our exactlyOnceSupport method, but it will be fairly generic
+                // We add a second error message here to give users more insight into why this specific connector can't support exactly-once
+                // guarantees with the given configuration
+                exactlyOnceSupport.addErrorMessage(
+                        "MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with "
+                                + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; "
+                                + "otherwise, records from aborted and uncommitted transactions will be replicated from the "
+                                + "source cluster to the target cluster."
+                );
+            }
+        }
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
     @Override
     public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+        return consumerUsesReadCommitted(props)
+                ? ExactlyOnceSupport.SUPPORTED
+                : ExactlyOnceSupport.UNSUPPORTED;
+    }
+
+    private boolean consumerUsesReadCommitted(Map<String, String> props) {
+        Object consumerIsolationLevel = MirrorSourceConfig.sourceConsumerConfig(props)
+                .get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        return Objects.equals(READ_COMMITTED, consumerIsolationLevel);
+    }
+
     // visible for testing
     List<TopicPartition> findSourceTopicPartitions()
             throws InterruptedException, ExecutionException {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 304b42d71c5..92e37e5fd15 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
@@ -29,8 +30,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
 
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX;
+import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
+import static org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_LAG_MAX;
 import static org.apache.kafka.connect.mirror.MirrorSourceConfig.TASK_TOPIC_PARTITIONS;
 import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,6 +61,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class MirrorSourceConnectorTest {
 
@@ -334,4 +342,120 @@ public class MirrorSourceConnectorTest {
         assertDoesNotThrow(() -> connector.isCycle(".b"));
     }
 
+    @Test
+    public void testExactlyOnceSupport() {
+        String readCommitted = "read_committed";
+        String readUncommitted = "read_uncommitted";
+        String readGarbage = "read_garbage";
+
+        // Connector is configured correctly, but exactly-once can't be supported
+        assertExactlyOnceSupport(null, null, false);
+        assertExactlyOnceSupport(readUncommitted, null, false);
+        assertExactlyOnceSupport(null, readUncommitted, false);
+        assertExactlyOnceSupport(readUncommitted, readUncommitted, false);
+
+        // Connector is configured correctly, and exactly-once can be supported
+        assertExactlyOnceSupport(readCommitted, null, true);
+        assertExactlyOnceSupport(null, readCommitted, true);
+        assertExactlyOnceSupport(readUncommitted, readCommitted, true);
+        assertExactlyOnceSupport(readCommitted, readCommitted, true);
+
+        // Connector is configured incorrectly, but is able to react gracefully
+        assertExactlyOnceSupport(readGarbage, null, false);
+        assertExactlyOnceSupport(null, readGarbage, false);
+        assertExactlyOnceSupport(readGarbage, readGarbage, false);
+        assertExactlyOnceSupport(readCommitted, readGarbage, false);
+        assertExactlyOnceSupport(readUncommitted, readGarbage, false);
+        assertExactlyOnceSupport(readGarbage, readUncommitted, false);
+        assertExactlyOnceSupport(readGarbage, readCommitted, true);
+    }
+
+    private void assertExactlyOnceSupport(String defaultIsolationLevel, String sourceIsolationLevel, boolean expected) {
+        Map<String, String> props = makeProps();
+        if (defaultIsolationLevel != null) {
+            props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, defaultIsolationLevel);
+        }
+        if (sourceIsolationLevel != null) {
+            props.put(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, sourceIsolationLevel);
+        }
+        ExactlyOnceSupport expectedSupport = expected ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
+        ExactlyOnceSupport actualSupport = new MirrorSourceConnector().exactlyOnceSupport(props);
+        assertEquals(expectedSupport, actualSupport);
+    }
+
+    @Test
+    public void testExactlyOnceSupportValidation() {
+        String exactlyOnceSupport = "exactly.once.support";
+
+        Map<String, String> props = makeProps();
+        Optional<ConfigValue> configValue = validateProperty(exactlyOnceSupport, props);
+        assertEquals(Optional.empty(), configValue);
+
+        props.put(exactlyOnceSupport, "requested");
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertEquals(Optional.empty(), configValue);
+
+        props.put(exactlyOnceSupport, "garbage");
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertEquals(Optional.empty(), configValue);
+
+        props.put(exactlyOnceSupport, "required");
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertTrue(configValue.isPresent());
+        List<String> errorMessages = configValue.get().errorMessages();
+        assertEquals(1, errorMessages.size());
+        String errorMessage = errorMessages.get(0);
+        assertTrue(
+                errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG),
+                "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"
+        );
+
+        props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, "read_committed");
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertEquals(Optional.empty(), configValue);
+
+        // Make sure that an unrelated invalid property doesn't cause an exception to be thrown and is instead handled and reported gracefully
+        props.put(OFFSET_LAG_MAX, "bad");
+        // Ensure that the issue with the invalid property is reported...
+        configValue = validateProperty(OFFSET_LAG_MAX, props);
+        assertTrue(configValue.isPresent());
+        errorMessages = configValue.get().errorMessages();
+        assertEquals(1, errorMessages.size());
+        errorMessage = errorMessages.get(0);
+        assertTrue(
+                errorMessages.get(0).contains(OFFSET_LAG_MAX),
+                "Error message \"" + errorMessage + "\" should have mentioned the 'offset.lag.max' property"
+        );
+        // ... and that it does not cause any issues with validation for exactly-once support...
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertEquals(Optional.empty(), configValue);
+
+        // ... regardless of whether validation for exactly-once support does or does not find an error
+        props.remove(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG);
+        configValue = validateProperty(exactlyOnceSupport, props);
+        assertTrue(configValue.isPresent());
+        errorMessages = configValue.get().errorMessages();
+        assertEquals(1, errorMessages.size());
+        errorMessage = errorMessages.get(0);
+        assertTrue(
+                errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG),
+                "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"
+        );
+    }
+
+    private Optional<ConfigValue> validateProperty(String name, Map<String, String> props) {
+        List<ConfigValue> results = new MirrorSourceConnector().validate(props)
+                .configValues().stream()
+                .filter(cv -> name.equals(cv.name()))
+                .collect(Collectors.toList());
+
+        assertTrue(results.size() <= 1, "Connector produced multiple config values for '" + name + "' property");
+
+        if (results.isEmpty())
+            return Optional.empty();
+
+        ConfigValue result = results.get(0);
+        assertNotNull(result, "Connector should not have record null config value for '" + name + "' property");
+        return Optional.of(result);
+    }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
index f78de9bced6..e2db7b3865e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
@@ -199,7 +199,8 @@ public class DedicatedMirrorIntegrationTest {
                     // Enable exactly-once support to both validate that MirrorMaker can run with
                     // that feature turned on, and to force cross-worker communication before
                     // task startup
-                    put(a + ".exactly.once.source.support", "enabled");
+                    put(b + ".exactly.once.source.support", "enabled");
+                    put(a + ".consumer.isolation.level", "read_committed");
                     put(ab + ".enabled", "true");
                     put(ab + ".topics", "^" + testTopicPrefix + ".*");
                     // The name of the offset syncs topic will contain the name of the cluster in
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 1a961be564c..dd683f1acfe 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -225,7 +225,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
                 consumerProps, "test-topic-1");
 
         waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
-                consumerGroupName, NUM_RECORDS_PRODUCED);
+                consumerGroupName, NUM_RECORDS_PRODUCED, true);
 
         ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
@@ -254,7 +254,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
                 "group.id", consumerGroupName), "test-topic-1", "test-topic-2");
 
         waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
-                consumerGroupName, NUM_RECORDS_PRODUCED);
+                consumerGroupName, NUM_RECORDS_PRODUCED, true);
 
         records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 5ca9b110707..27da7054b67 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -118,6 +118,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected Properties backupBrokerProps = new Properties();
     protected Map<String, String> primaryWorkerProps = new HashMap<>();
     protected Map<String, String> backupWorkerProps = new HashMap<>();
+    protected boolean exactOffsetTranslation = true;
     
     @BeforeEach
     public void startClusters() throws Exception {
@@ -450,7 +451,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             consumerProps, "primary.test-topic-1");
 
         waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED);
+            consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
 
         ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
@@ -479,7 +480,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2");
 
         waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED);
+            consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
 
         records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
@@ -708,7 +709,7 @@ public class MirrorConnectorsIntegrationBaseTest {
      * offsets are eventually synced to the expected offset numbers
      */
     protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords)
+            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
             throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
             List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
@@ -728,8 +729,11 @@ public class MirrorConnectorsIntegrationBaseTest {
                 Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
                 long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
 
+                boolean totalOffsetsMatch = exactOffsetTranslation
+                        ? totalOffsets == expectedTotalOffsets
+                        : totalOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected number
-                return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
new file mode 100644
index 00000000000..a50b21bd58b
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Tests MM2 replication with exactly-once support enabled on the Connect clusters.
+ */
+@Tag("integration")
+public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectorsIntegrationBaseTest {
+
+    @BeforeEach
+    public void startClusters() throws Exception {
+        mm2Props.put(
+                PRIMARY_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+                DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString()
+        );
+        mm2Props.put(
+                BACKUP_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+                DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString()
+        );
+        for (Properties brokerProps : Arrays.asList(primaryBrokerProps, backupBrokerProps)) {
+            brokerProps.put("transaction.state.log.replication.factor", "1");
+            brokerProps.put("transaction.state.log.min.isr", "1");
+        }
+        // Transaction marker records will cause translated offsets to not match
+        // between source and target
+        exactOffsetTranslation = false;
+        super.startClusters();
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 9cacb6e1ee6..1c0ebea2866 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -212,7 +212,7 @@ public class DistributedConfig extends WorkerConfig {
         + "on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
     private Crypto crypto;
 
-    private enum ExactlyOnceSourceSupport {
+    public enum ExactlyOnceSourceSupport {
         DISABLED(false),
         PREPARING(true),
         ENABLED(true);