You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/13 15:09:40 UTC
[kafka] branch trunk updated: KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8cfafba2794 KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)
8cfafba2794 is described below
commit 8cfafba2794562840b0f1c537e304f084b9359cf
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Feb 13 10:09:14 2023 -0500
KAFKA-14021: Implement new KIP-618 APIs in MirrorSourceConnector (#12366)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../apache/kafka/common/config/AbstractConfig.java | 9 +-
.../java/org/apache/kafka/common/utils/Utils.java | 38 +++++++
.../org/apache/kafka/common/utils/UtilsTest.java | 18 +++
connect/mirror/README.md | 42 +++++++
.../connect/mirror/MirrorConnectorConfig.java | 20 ++--
.../kafka/connect/mirror/MirrorMakerConfig.java | 8 +-
.../connect/mirror/MirrorSourceConnector.java | 49 ++++++++
.../connect/mirror/MirrorSourceConnectorTest.java | 124 +++++++++++++++++++++
.../DedicatedMirrorIntegrationTest.java | 3 +-
.../IdentityReplicationIntegrationTest.java | 4 +-
.../MirrorConnectorsIntegrationBaseTest.java | 12 +-
...MirrorConnectorsIntegrationExactlyOnceTest.java | 52 +++++++++
.../runtime/distributed/DistributedConfig.java | 2 +-
13 files changed, 351 insertions(+), 30 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..e620f18f7d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -274,14 +274,7 @@ public class AbstractConfig {
*/
public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
Map<String, Object> result = new RecordingMap<>(prefix, false);
- for (Map.Entry<String, ?> entry : originals.entrySet()) {
- if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
- if (strip)
- result.put(entry.getKey().substring(prefix.length()), entry.getValue());
- else
- result.put(entry.getKey(), entry.getValue());
- }
- }
+ result.putAll(Utils.entriesWithPrefix(originals, prefix, strip));
return result;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4ac8d7d2fe1..0c3d6f15636 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1520,4 +1520,42 @@ public final class Utils {
throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str);
return str.substring(0, str.length() - oldSuffix.length()) + newSuffix;
}
+
+ /**
+ * Find all key/value pairs whose keys begin with the given prefix, and remove that prefix from all
+ * resulting keys.
+ * @param map the map to filter key/value pairs from
+ * @param prefix the prefix to search keys for
+ * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map}
+ * parameter whose key begins with the given {@code prefix} and whose corresponding keys have
+ * the prefix stripped from them; may be empty, but never null
+ * @param <V> the type of values stored in the map
+ */
+ public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix) {
+ return entriesWithPrefix(map, prefix, true);
+ }
+
+ /**
+ * Find all key/value pairs whose keys begin with the given prefix, optionally removing that prefix
+ * from all resulting keys.
+ * @param map the map to filter key/value pairs from
+ * @param prefix the prefix to search keys for
+ * @param strip whether the keys of the returned map should not include the prefix
+ * @return a {@link Map} containing a key/value pair for every key/value pair in the {@code map}
+ * parameter whose key begins with the given {@code prefix}; may be empty, but never null
+ * @param <V> the type of values stored in the map
+ */
+ public static <V> Map<String, V> entriesWithPrefix(Map<String, V> map, String prefix, boolean strip) {
+ Map<String, V> result = new HashMap<>();
+ for (Map.Entry<String, V> entry : map.entrySet()) {
+ if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+ if (strip)
+ result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+ else
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return result;
+ }
+
}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2366da33662..bed5ed62dd6 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -44,6 +44,7 @@ import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
@@ -936,4 +937,21 @@ public class UtilsTest {
assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"));
}
+ @Test
+ public void testEntriesWithPrefix() {
+ Map<String, Object> props = new HashMap<>();
+ props.put("foo.bar", "abc");
+ props.put("setting", "def");
+
+ // With stripping
+ Map<String, Object> expected = Collections.singletonMap("bar", "abc");
+ Map<String, Object> actual = Utils.entriesWithPrefix(props, "foo.");
+ assertEquals(expected, actual);
+
+ // Without stripping
+ expected = Collections.singletonMap("foo.bar", "abc");
+ actual = Utils.entriesWithPrefix(props, "foo.", false);
+ assertEquals(expected, actual);
+ }
+
}
diff --git a/connect/mirror/README.md b/connect/mirror/README.md
index 3c8aebc635f..db016fd3742 100644
--- a/connect/mirror/README.md
+++ b/connect/mirror/README.md
@@ -194,6 +194,48 @@ it is important to keep configuration consistent across flows to the same
target cluster. In most cases, your entire organization should use a single
MM2 configuration file.
+### Exactly-once
+Exactly-once semantics are supported for dedicated MM2 clusters as of version 3.5.0.
+
+For new MM2 clusters, set the `exactly.once.source.support` property to `enabled` for
+all targeted Kafka clusters that should be written to with exactly-once semantics. For example,
+to enable exactly-once for writes to cluster `B`, add the following to your MM2 config file:
+
+ B.exactly.once.source.support = enabled
+
+For existing MM2 clusters, a two-step upgrade is necessary. Instead of immediately
+setting the `exactly.once.source.support` property to `enabled`, first set it to `preparing` on
+all nodes in the cluster. Once this is complete, it can be set to `enabled` on all nodes in the
+cluster, in a second round of restarts.
+
+In either case, it is also necessary to enable intra-cluster communication between your MM2
+nodes, as described in
+[KIP-710](https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters).
+To do this, the `dedicated.mode.enable.internal.rest` property must be set to `true` in your MM2 config.
+In addition, many of the REST-related
+[configuration properties available for Kafka Connect](https://kafka.apache.org/documentation/#connectconfigs)
+can be specified in your MM2 config. For example, to enable intra-cluster communication in your MM2
+cluster with each node listening on port 8080 of their local machine, add this to your config file:
+
+ dedicated.mode.enable.internal.rest = true
+ listeners = http://localhost:8080
+
+**Note that, if intra-cluster communication is enabled in production environments, it is highly
+recommended to secure the REST servers brought up by each MM2 node. See the configuration
+properties for Kafka Connect for information on how this can be accomplished.**
+
+It is also recommended to filter records from aborted transactions out from replicated data
+when running MM2. To do this, ensure that the consumer used to read from source clusters is
+configured with `isolation.level` set to `read_committed`. If replicating data from cluster `A`,
+this can be done for all replication flows that read from that cluster by adding the following
+to your MM2 config:
+
+ A.consumer.isolation.level = read_committed
+
+As a final note, under the hood, MM2 uses Kafka Connect source connectors to replicate data.
+For more information on exactly-once support for these kinds of connectors, see the
+[relevant docs page](https://kafka.apache.org/documentation/#connect_exactlyoncesource).
+
## Remote topics
MM2 employs a naming convention to ensure that records from different
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index e65134d7416..6896cf74157 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -146,14 +146,18 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
}
Map<String, Object> sourceConsumerConfig() {
- Map<String, Object> props = new HashMap<>();
- props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
- props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
- props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
- props.putAll(originalsWithPrefix(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX));
- props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
- return props;
+ return sourceConsumerConfig(originals());
+ }
+
+ static Map<String, Object> sourceConsumerConfig(Map<String, ?> props) {
+ Map<String, Object> result = new HashMap<>();
+ result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX));
+ result.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
+ result.putAll(Utils.entriesWithPrefix(props, CONSUMER_CLIENT_PREFIX));
+ result.putAll(Utils.entriesWithPrefix(props, SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX));
+ result.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
+ result.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return result;
}
Map<String, Object> targetAdminConfig() {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
index 85f4e9d79e4..3acc6bf7624 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
@@ -300,15 +300,11 @@ public class MirrorMakerConfig extends AbstractConfig {
}
private Map<String, String> stringsWithPrefixStripped(String prefix) {
- return originalsStrings().entrySet().stream()
- .filter(x -> x.getKey().startsWith(prefix))
- .collect(Collectors.toMap(x -> x.getKey().substring(prefix.length()), Entry::getValue));
+ return Utils.entriesWithPrefix(originalsStrings(), prefix);
}
private Map<String, String> stringsWithPrefix(String prefix) {
- Map<String, String> strings = originalsStrings();
- strings.keySet().removeIf(x -> !x.startsWith(prefix));
- return strings;
+ return Utils.entriesWithPrefix(originalsStrings(), prefix, false);
}
static Map<String, String> clusterConfigsWithPrefix(String prefix, Map<String, String> props) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b49daf0ff76..0ecc69c0b37 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.connect.mirror;
+import java.util.Locale;
import java.util.Map.Entry;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
@@ -47,6 +52,7 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
+import java.util.Objects;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
@@ -69,6 +75,8 @@ public class MirrorSourceConnector extends SourceConnector {
private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC,
null, PatternType.ANY);
private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
+ private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT);
+ private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
private Scheduler scheduler;
private MirrorSourceConfig config;
@@ -188,11 +196,52 @@ public class MirrorSourceConnector extends SourceConnector {
return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
}
+ @Override
+ public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
+ List<ConfigValue> configValues = super.validate(props).configValues();
+ if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+ if (!consumerUsesReadCommitted(props)) {
+ ConfigValue exactlyOnceSupport = configValues.stream()
+ .filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+ .findAny()
+ .orElseGet(() -> {
+ ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+ configValues.add(result);
+ return result;
+ });
+ // The Connect framework will already generate an error for this property if we return ExactlyOnceSupport.UNSUPPORTED
+ // from our exactlyOnceSupport method, but it will be fairly generic
+ // We add a second error message here to give users more insight into why this specific connector can't support exactly-once
+ // guarantees with the given configuration
+ exactlyOnceSupport.addErrorMessage(
+ "MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with "
+ + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " set to '" + READ_COMMITTED + "'; "
+ + "otherwise, records from aborted and uncommitted transactions will be replicated from the "
+ + "source cluster to the target cluster."
+ );
+ }
+ }
+ return new org.apache.kafka.common.config.Config(configValues);
+ }
+
@Override
public String version() {
return AppInfoParser.getVersion();
}
+ @Override
+ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+ return consumerUsesReadCommitted(props)
+ ? ExactlyOnceSupport.SUPPORTED
+ : ExactlyOnceSupport.UNSUPPORTED;
+ }
+
+ private boolean consumerUsesReadCommitted(Map<String, String> props) {
+ Object consumerIsolationLevel = MirrorSourceConfig.sourceConsumerConfig(props)
+ .get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+ return Objects.equals(READ_COMMITTED, consumerIsolationLevel);
+ }
+
// visible for testing
List<TopicPartition> findSourceTopicPartitions()
throws InterruptedException, ExecutionException {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 304b42d71c5..92e37e5fd15 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@@ -29,8 +30,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.junit.jupiter.api.Test;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX;
+import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.SOURCE_PREFIX;
+import static org.apache.kafka.connect.mirror.MirrorSourceConfig.OFFSET_LAG_MAX;
import static org.apache.kafka.connect.mirror.MirrorSourceConfig.TASK_TOPIC_PARTITIONS;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -55,6 +61,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
public class MirrorSourceConnectorTest {
@@ -334,4 +342,120 @@ public class MirrorSourceConnectorTest {
assertDoesNotThrow(() -> connector.isCycle(".b"));
}
+ @Test
+ public void testExactlyOnceSupport() {
+ String readCommitted = "read_committed";
+ String readUncommitted = "read_uncommitted";
+ String readGarbage = "read_garbage";
+
+ // Connector is configured correctly, but exactly-once can't be supported
+ assertExactlyOnceSupport(null, null, false);
+ assertExactlyOnceSupport(readUncommitted, null, false);
+ assertExactlyOnceSupport(null, readUncommitted, false);
+ assertExactlyOnceSupport(readUncommitted, readUncommitted, false);
+
+ // Connector is configured correctly, and exactly-once can be supported
+ assertExactlyOnceSupport(readCommitted, null, true);
+ assertExactlyOnceSupport(null, readCommitted, true);
+ assertExactlyOnceSupport(readUncommitted, readCommitted, true);
+ assertExactlyOnceSupport(readCommitted, readCommitted, true);
+
+ // Connector is configured incorrectly, but is able to react gracefully
+ assertExactlyOnceSupport(readGarbage, null, false);
+ assertExactlyOnceSupport(null, readGarbage, false);
+ assertExactlyOnceSupport(readGarbage, readGarbage, false);
+ assertExactlyOnceSupport(readCommitted, readGarbage, false);
+ assertExactlyOnceSupport(readUncommitted, readGarbage, false);
+ assertExactlyOnceSupport(readGarbage, readUncommitted, false);
+ assertExactlyOnceSupport(readGarbage, readCommitted, true);
+ }
+
+ private void assertExactlyOnceSupport(String defaultIsolationLevel, String sourceIsolationLevel, boolean expected) {
+ Map<String, String> props = makeProps();
+ if (defaultIsolationLevel != null) {
+ props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, defaultIsolationLevel);
+ }
+ if (sourceIsolationLevel != null) {
+ props.put(SOURCE_PREFIX + CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, sourceIsolationLevel);
+ }
+ ExactlyOnceSupport expectedSupport = expected ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
+ ExactlyOnceSupport actualSupport = new MirrorSourceConnector().exactlyOnceSupport(props);
+ assertEquals(expectedSupport, actualSupport);
+ }
+
+ @Test
+ public void testExactlyOnceSupportValidation() {
+ String exactlyOnceSupport = "exactly.once.support";
+
+ Map<String, String> props = makeProps();
+ Optional<ConfigValue> configValue = validateProperty(exactlyOnceSupport, props);
+ assertEquals(Optional.empty(), configValue);
+
+ props.put(exactlyOnceSupport, "requested");
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertEquals(Optional.empty(), configValue);
+
+ props.put(exactlyOnceSupport, "garbage");
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertEquals(Optional.empty(), configValue);
+
+ props.put(exactlyOnceSupport, "required");
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertTrue(configValue.isPresent());
+ List<String> errorMessages = configValue.get().errorMessages();
+ assertEquals(1, errorMessages.size());
+ String errorMessage = errorMessages.get(0);
+ assertTrue(
+ errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG),
+ "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"
+ );
+
+ props.put(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG, "read_committed");
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertEquals(Optional.empty(), configValue);
+
+ // Make sure that an unrelated invalid property doesn't cause an exception to be thrown and is instead handled and reported gracefully
+ props.put(OFFSET_LAG_MAX, "bad");
+ // Ensure that the issue with the invalid property is reported...
+ configValue = validateProperty(OFFSET_LAG_MAX, props);
+ assertTrue(configValue.isPresent());
+ errorMessages = configValue.get().errorMessages();
+ assertEquals(1, errorMessages.size());
+ errorMessage = errorMessages.get(0);
+ assertTrue(
+ errorMessages.get(0).contains(OFFSET_LAG_MAX),
+ "Error message \"" + errorMessage + "\" should have mentioned the 'offset.lag.max' property"
+ );
+ // ... and that it does not cause any issues with validation for exactly-once support...
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertEquals(Optional.empty(), configValue);
+
+ // ... regardless of whether validation for exactly-once support does or does not find an error
+ props.remove(CONSUMER_CLIENT_PREFIX + ISOLATION_LEVEL_CONFIG);
+ configValue = validateProperty(exactlyOnceSupport, props);
+ assertTrue(configValue.isPresent());
+ errorMessages = configValue.get().errorMessages();
+ assertEquals(1, errorMessages.size());
+ errorMessage = errorMessages.get(0);
+ assertTrue(
+ errorMessages.get(0).contains(ISOLATION_LEVEL_CONFIG),
+ "Error message \"" + errorMessage + "\" should have mentioned the 'isolation.level' consumer property"
+ );
+ }
+
+ private Optional<ConfigValue> validateProperty(String name, Map<String, String> props) {
+ List<ConfigValue> results = new MirrorSourceConnector().validate(props)
+ .configValues().stream()
+ .filter(cv -> name.equals(cv.name()))
+ .collect(Collectors.toList());
+
+ assertTrue(results.size() <= 1, "Connector produced multiple config values for '" + name + "' property");
+
+ if (results.isEmpty())
+ return Optional.empty();
+
+ ConfigValue result = results.get(0);
+ assertNotNull(result, "Connector should not have record null config value for '" + name + "' property");
+ return Optional.of(result);
+ }
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
index f78de9bced6..e2db7b3865e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java
@@ -199,7 +199,8 @@ public class DedicatedMirrorIntegrationTest {
// Enable exactly-once support to both validate that MirrorMaker can run with
// that feature turned on, and to force cross-worker communication before
// task startup
- put(a + ".exactly.once.source.support", "enabled");
+ put(b + ".exactly.once.source.support", "enabled");
+ put(a + ".consumer.isolation.level", "read_committed");
put(ab + ".enabled", "true");
put(ab + ".topics", "^" + testTopicPrefix + ".*");
// The name of the offset syncs topic will contain the name of the cluster in
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 1a961be564c..dd683f1acfe 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -225,7 +225,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
consumerProps, "test-topic-1");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, true);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
@@ -254,7 +254,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
"group.id", consumerGroupName), "test-topic-1", "test-topic-2");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, true);
records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 5ca9b110707..27da7054b67 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -118,6 +118,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected Properties backupBrokerProps = new Properties();
protected Map<String, String> primaryWorkerProps = new HashMap<>();
protected Map<String, String> backupWorkerProps = new HashMap<>();
+ protected boolean exactOffsetTranslation = true;
@BeforeEach
public void startClusters() throws Exception {
@@ -450,7 +451,7 @@ public class MirrorConnectorsIntegrationBaseTest {
consumerProps, "primary.test-topic-1");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
@@ -479,7 +480,7 @@ public class MirrorConnectorsIntegrationBaseTest {
"group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
@@ -708,7 +709,7 @@ public class MirrorConnectorsIntegrationBaseTest {
* offsets are eventually synced to the expected offset numbers
*/
protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
- Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords)
+ Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
throws InterruptedException {
try (Admin adminClient = connect.kafka().createAdminClient()) {
List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
@@ -728,8 +729,11 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
+ boolean totalOffsetsMatch = exactOffsetTranslation
+ ? totalOffsets == expectedTotalOffsets
+ : totalOffsets >= expectedTotalOffsets;
// make sure the consumer group offsets are synced to expected number
- return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0;
+ return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
new file mode 100644
index 00000000000..a50b21bd58b
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Tests MM2 replication with exactly-once support enabled on the Connect clusters.
+ */
+@Tag("integration")
+public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectorsIntegrationBaseTest {
+
+ @BeforeEach
+ public void startClusters() throws Exception {
+ mm2Props.put(
+ PRIMARY_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+ DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString()
+ );
+ mm2Props.put(
+ BACKUP_CLUSTER_ALIAS + "." + DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
+ DistributedConfig.ExactlyOnceSourceSupport.ENABLED.toString()
+ );
+ for (Properties brokerProps : Arrays.asList(primaryBrokerProps, backupBrokerProps)) {
+ brokerProps.put("transaction.state.log.replication.factor", "1");
+ brokerProps.put("transaction.state.log.min.isr", "1");
+ }
+ // Transaction marker records will cause translated offsets to not match
+ // between source and target
+ exactOffsetTranslation = false;
+ super.startClusters();
+ }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 9cacb6e1ee6..1c0ebea2866 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -212,7 +212,7 @@ public class DistributedConfig extends WorkerConfig {
+ "on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
private Crypto crypto;
- private enum ExactlyOnceSourceSupport {
+ public enum ExactlyOnceSourceSupport {
DISABLED(false),
PREPARING(true),
ENABLED(true);