You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:12 UTC
[kafka] 04/04: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 175a342580aef327fdc8bbbc09f616c1509850f8
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Feb 17 14:25:17 2023 -0800
KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)
KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2
KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2
KAFKA-12566: Fix flaky MirrorMaker 2 integration tests
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../connect/mirror/MirrorCheckpointConfig.java | 6 +
.../kafka/connect/mirror/MirrorCheckpointTask.java | 17 +-
.../kafka/connect/mirror/OffsetSyncStore.java | 119 +++++++---
.../connect/mirror/MirrorCheckpointTaskTest.java | 32 ++-
.../kafka/connect/mirror/OffsetSyncStoreTest.java | 86 +++++--
.../IdentityReplicationIntegrationTest.java | 45 ++--
.../MirrorConnectorsIntegrationBaseTest.java | 259 ++++++++++++++++-----
...irrorConnectorsIntegrationTransactionsTest.java | 66 ++++++
.../apache/kafka/connect/util/KafkaBasedLog.java | 24 +-
.../org/apache/kafka/connect/util/TopicAdmin.java | 3 +-
10 files changed, 506 insertions(+), 151 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
index e21d22af1d4..122d8ad1e7f 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
@@ -150,6 +150,12 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
: targetConsumerConfig();
}
+ Map<String, Object> offsetSyncsTopicAdminConfig() {
+ return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
+ ? sourceAdminConfig()
+ : targetAdminConfig();
+ }
+
Duration consumerPollTimeout() {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 95b7b5a2bda..9f5a4b00e69 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -102,10 +102,13 @@ public class MirrorCheckpointTask extends SourceTask {
idleConsumerGroupsOffset = new HashMap<>();
checkpointsPerConsumerGroup = new HashMap<>();
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
- scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
- "refreshing idle consumers group offsets at target cluster");
- scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
- "sync idle consumer group offset from source to target");
+ scheduler.execute(() -> {
+ offsetSyncStore.start();
+ scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
+ "refreshing idle consumers group offsets at target cluster");
+ scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
+ "sync idle consumer group offset from source to target");
+ }, "starting offset sync store");
}
@Override
@@ -136,7 +139,11 @@ public class MirrorCheckpointTask extends SourceTask {
try {
long deadline = System.currentTimeMillis() + interval.toMillis();
while (!stopping && System.currentTimeMillis() < deadline) {
- offsetSyncStore.update(pollTimeout);
+ Thread.sleep(pollTimeout.toMillis());
+ }
+ if (stopping) {
+ // we are stopping, return early.
+ return null;
}
List<SourceRecord> records = new ArrayList<>();
for (String group : consumerGroups) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 1cfdb1d265c..0169446aa04 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -16,65 +16,124 @@
*/
package org.apache.kafka.connect.mirror;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
-import java.util.Map;
-import java.util.HashMap;
import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
class OffsetSyncStore implements AutoCloseable {
- private final KafkaConsumer<byte[], byte[]> consumer;
- private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
- private final TopicPartition offsetSyncTopicPartition;
+ private final KafkaBasedLog<byte[], byte[]> backingStore;
+ private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+ private final TopicAdmin admin;
+ protected volatile boolean readToEnd = false;
OffsetSyncStore(MirrorCheckpointConfig config) {
- consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
- new ByteArrayDeserializer(), new ByteArrayDeserializer());
- offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
- consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+ Consumer<byte[], byte[]> consumer = null;
+ TopicAdmin admin = null;
+ KafkaBasedLog<byte[], byte[]> store;
+ try {
+ consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+ admin = new TopicAdmin(
+ config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+ config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+ store = createBackingStore(config, consumer, admin);
+ } catch (Throwable t) {
+ Utils.closeQuietly(consumer, "consumer for offset syncs");
+ Utils.closeQuietly(admin, "admin client for offset syncs");
+ throw t;
+ }
+ this.admin = admin;
+ this.backingStore = store;
}
- // for testing
- OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
- this.consumer = consumer;
- this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+ private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
+ return new KafkaBasedLog<byte[], byte[]>(
+ config.offsetSyncsTopic(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ () -> admin,
+ (error, record) -> this.handleRecord(record),
+ Time.SYSTEM,
+ ignored -> {
+ }
+ ) {
+ @Override
+ protected Producer<byte[], byte[]> createProducer() {
+ return null;
+ }
+
+ @Override
+ protected Consumer<byte[], byte[]> createConsumer() {
+ return consumer;
+ }
+
+ @Override
+ protected boolean readPartition(TopicPartition topicPartition) {
+ return topicPartition.partition() == 0;
+ }
+ };
+ }
+
+ OffsetSyncStore() {
+ this.admin = null;
+ this.backingStore = null;
+ }
+
+ /**
+ * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
+ */
+ public void start() {
+ backingStore.start();
+ readToEnd = true;
}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+ if (!readToEnd) {
+ // If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
+ // This prevents emitting stale offsets while initially reading the offset syncs topic.
+ return OptionalLong.empty();
+ }
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
return OptionalLong.of(-1L);
}
- long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
+ // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
+ // downstream offset past the offset sync itself. This is because we know that future records must appear
+ // ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
+ // will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
+ // This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
+ // This may cause re-reading of records depending on the age of the offset sync.
+ // s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
+ // source |-s?????r???g-|
+ // | ______/
+ // | /
+ // vv
+ // target |-sg----r-----|
+ long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
return OptionalLong.empty();
}
}
- // poll and handle records
- synchronized void update(Duration pollTimeout) {
- try {
- consumer.poll(pollTimeout).forEach(this::handleRecord);
- } catch (WakeupException e) {
- // swallow
- }
- }
-
- public synchronized void close() {
- consumer.wakeup();
- Utils.closeQuietly(consumer, "offset sync store consumer");
+ @Override
+ public void close() {
+ Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
+ Utils.closeQuietly(admin, "admin client for offset syncs");
}
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 20735cd2334..500cb6c131a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -52,11 +52,16 @@ public class MirrorCheckpointTaskTest {
@Test
public void testCheckpoint() {
+ long t1UpstreamOffset = 3L;
+ long t1DownstreamOffset = 4L;
+ long t2UpstreamOffset = 7L;
+ long t2DownstreamOffset = 8L;
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
- offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
- offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+ offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
+ offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
assertTrue(optionalCheckpoint1.isPresent());
@@ -70,7 +75,7 @@ public class MirrorCheckpointTaskTest {
"checkpoint group9 sourcePartition failed");
assertEquals(10, checkpoint1.upstreamOffset(),
"checkpoint group9 upstreamOffset failed");
- assertEquals(11, checkpoint1.downstreamOffset(),
+ assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
"checkpoint group9 downstreamOffset failed");
assertEquals(123L, sourceRecord1.timestamp().longValue(),
"checkpoint group9 timestamp failed");
@@ -87,10 +92,27 @@ public class MirrorCheckpointTaskTest {
"checkpoint group11 sourcePartition failed");
assertEquals(12, checkpoint2.upstreamOffset(),
"checkpoint group11 upstreamOffset failed");
- assertEquals(13, checkpoint2.downstreamOffset(),
+ assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
"checkpoint group11 downstreamOffset failed");
assertEquals(234L, sourceRecord2.timestamp().longValue(),
"checkpoint group11 timestamp failed");
+ Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
+ new OffsetAndMetadata(7, null));
+ assertTrue(optionalCheckpoint3.isPresent());
+ Checkpoint checkpoint3 = optionalCheckpoint3.get();
+ SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
+ assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
+ "checkpoint group13 topic5 failed");
+ assertEquals("group13", checkpoint3.consumerGroupId(),
+ "checkpoint group13 consumerGroupId failed");
+ assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
+ "checkpoint group13 sourcePartition failed");
+ assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
+ "checkpoint group13 upstreamOffset failed");
+ assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
+ "checkpoint group13 downstreamOffset failed");
+ assertEquals(234L, sourceRecord3.timestamp().longValue(),
+ "checkpoint group13 timestamp failed");
}
@Test
@@ -150,6 +172,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testNoCheckpointForTopicWithoutOffsetSyncs() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
@@ -165,6 +188,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9224a088081..163e5b72250 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
+import java.util.OptionalLong;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OffsetSyncStoreTest {
@@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
static class FakeOffsetSyncStore extends OffsetSyncStore {
FakeOffsetSyncStore() {
- super(null, null);
+ super();
+ }
+
+ @Override
+ public void start() {
+ // do not call super to avoid NPE without a KafkaBasedLog.
+ readToEnd = true;
}
void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
@@ -44,29 +52,57 @@ public class OffsetSyncStoreTest {
@Test
public void testOffsetTranslation() {
- FakeOffsetSyncStore store = new FakeOffsetSyncStore();
-
- store.sync(tp, 100, 200);
- assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
- "Failure in translating downstream offset 250");
-
- // Translate exact offsets
- store.sync(tp, 150, 251);
- assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
- "Failure in translating exact downstream offset 251");
-
- // Use old offset (5) prior to any sync -> can't translate
- assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
- "Expected old offset to not translate");
-
- // Downstream offsets reset
- store.sync(tp, 200, 10);
- assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
- "Failure in resetting translation of downstream offset");
-
- // Upstream offsets reset
- store.sync(tp, 20, 20);
- assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
- "Failure in resetting translation of upstream offset");
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ store.start();
+
+ // Emit synced downstream offset without dead-reckoning
+ store.sync(tp, 100, 200);
+ assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+
+ // Translate exact offsets
+ store.sync(tp, 150, 251);
+ assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+
+ // Use old offset (5) prior to any sync -> can't translate
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+
+ // Downstream offsets reset
+ store.sync(tp, 200, 10);
+ assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+
+ // Upstream offsets reset
+ store.sync(tp, 20, 20);
+ assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+ }
+ }
+
+ @Test
+ public void testNoTranslationIfStoreNotStarted() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ // no offsets exist and store is not started
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+ // read a sync during startup
+ store.sync(tp, 100, 200);
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+ // After the store is started all offsets are visible
+ store.start();
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
+ assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+ }
+ }
+
+ @Test
+ public void testNoTranslationIfNoOffsetSync() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ store.start();
+ assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+ }
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 1a961be564c..fd58877208f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
-import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -107,11 +106,8 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
- Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- assertTrue(backupOffsets.containsKey(
- new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+ Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -221,18 +217,19 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
topicShouldNotBeCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "test-topic-1");
// create a consumer at backup cluster with same consumer group Id to consume 1 topic
- Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "test-topic-1");
+ try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "test-topic-1")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
+ waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
consumerGroupName, NUM_RECORDS_PRODUCED);
- ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // the size of consumer record should be zero, because the offsets of the same consumer group
- // have been automatically synchronized from primary to backup by the background job, so no
- // more records to consume from the replicated topic by the same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
+ // the size of consumer record should be zero, because the offsets of the same consumer group
+ // have been automatically synchronized from primary to backup by the background job, so no
+ // more records to consume from the replicated topic by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+ }
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -244,22 +241,24 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-2")) {
+ "group.id", consumerGroupName), "test-topic-2")) {
// we need to wait for consuming all the records for MM2 replicating the expected offsets
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
// create a consumer at backup cluster with same consumer group Id to consume old and new topic
- backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "test-topic-1", "test-topic-2");
+ try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
+
+ ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+ }
- records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
- backupConsumer.close();
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
/*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 5ca9b110707..82772ceb06e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,6 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
-import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -54,6 +55,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Tag;
@@ -93,6 +96,7 @@ public class MirrorConnectorsIntegrationBaseTest {
private static final int OFFSET_SYNC_DURATION_MS = 30_000;
private static final int TOPIC_SYNC_DURATION_MS = 60_000;
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
+ private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
private static final int NUM_WORKERS = 3;
protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
@@ -118,7 +122,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected Properties backupBrokerProps = new Properties();
protected Map<String, String> primaryWorkerProps = new HashMap<>();
protected Map<String, String> backupWorkerProps = new HashMap<>();
-
+
@BeforeEach
public void startClusters() throws Exception {
startClusters(new HashMap<String, String>() {{
@@ -302,13 +306,8 @@ public class MirrorConnectorsIntegrationBaseTest {
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
- Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
- for (int i = 0; i < NUM_PARTITIONS; i++) {
- assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
- "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
- }
+ Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -324,11 +323,10 @@ public class MirrorConnectorsIntegrationBaseTest {
"Checkpoints were not emitted upstream to primary cluster.");
}
- waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
+ Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+ primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
- Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
- Duration.ofMillis(CHECKPOINT_DURATION_MS));
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
primaryClient.close();
backupClient.close();
@@ -415,7 +413,7 @@ public class MirrorConnectorsIntegrationBaseTest {
assertEquals(0, offset.offset(), "Offset of last partition is not zero");
}
}
-
+
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
produceMessages(primary, "test-topic-1");
@@ -446,18 +444,19 @@ public class MirrorConnectorsIntegrationBaseTest {
topicShouldNotBeCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "primary.test-topic-1");
// create a consumer at backup cluster with same consumer group Id to consume 1 topic
- Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "primary.test-topic-1");
+ try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+ consumerProps, "primary.test-topic-1")) {
- waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
- ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // the size of consumer record should be zero, because the offsets of the same consumer group
- // have been automatically synchronized from primary to backup by the background job, so no
- // more records to consume from the replicated topic by the same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
+ // the size of consumer record should be zero, because the offsets of the same consumer group
+ // have been automatically synchronized from primary to backup by the background job, so no
+ // more records to consume from the replicated topic by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+ }
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -469,22 +468,24 @@ public class MirrorConnectorsIntegrationBaseTest {
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", "consumer-group-1"), "test-topic-2")) {
+ "group.id", consumerGroupName), "test-topic-2")) {
// we need to wait for consuming all the records for MM2 replicating the expected offsets
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
// create a consumer at backup cluster with same consumer group Id to consume old and new topic
- backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2");
+ try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+
+ waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+ consumerGroupName, NUM_RECORDS_PRODUCED);
- waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+ assertEquals(0, records.count(), "consumer record size is not zero");
+ }
- records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not zero");
- backupConsumer.close();
+ assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
}
@Test
@@ -501,10 +502,14 @@ public class MirrorConnectorsIntegrationBaseTest {
// Ensure the offset syncs topic is created in the target cluster
waitForTopicCreated(backup, "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal");
+ String consumerGroupName = "consumer-group-syncs-on-target";
+ Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
produceMessages(primary, "test-topic-1");
- ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
- String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
+ warmUpConsumer(consumerProps);
+
+ String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
// Check offsets are pushed to the checkpoint topic
Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -522,6 +527,8 @@ public class MirrorConnectorsIntegrationBaseTest {
"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
);
+ assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
// Ensure no offset-syncs topics have been created on the primary cluster
Set<String> primaryTopics = primary.kafka().createAdminClient().listTopics().names().get();
assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"));
@@ -574,6 +581,17 @@ public class MirrorConnectorsIntegrationBaseTest {
// Send some records to test-topic-no-checkpoints in the source cluster
produceMessages(primary, "test-topic-no-checkpoints");
+ try (Consumer<byte[], byte[]> consumer = primary.kafka().createConsumer(consumerProps)) {
+ Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+ Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> new OffsetAndMetadata(e.getValue())
+ ));
+ consumer.commitSync(offsetsToCommit);
+ }
+
waitForCondition(() -> {
Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
@@ -582,6 +600,33 @@ public class MirrorConnectorsIntegrationBaseTest {
}, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to backup cluster");
}
+ @Test
+ public void testRestartReplication() throws InterruptedException {
+ String consumerGroupName = "consumer-group-restart";
+ Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+ String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+ warmUpConsumer(consumerProps);
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ produceMessages(primary, "test-topic-1");
+ try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+ }
+ waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED);
+ restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+ Thread.sleep(5000);
+ produceMessages(primary, "test-topic-1");
+ try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+ }
+ waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED);
+ assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+ }
+
+
private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
}
@@ -644,6 +689,12 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
+ protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) {
+ for (Class<? extends Connector> connector : connectorClasses) {
+ connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
+ }
+ }
+
/*
* wait for the topic created on the cluster
*/
@@ -689,7 +740,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected void produceMessages(EmbeddedConnectCluster cluster, String topicName) {
Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
for (Map.Entry<String, String> entry : recordSent.entrySet()) {
- cluster.kafka().produce(topicName, entry.getKey(), entry.getValue());
+ produce(cluster.kafka(), topicName, null, entry.getKey(), entry.getValue());
}
}
@@ -700,16 +751,70 @@ public class MirrorConnectorsIntegrationBaseTest {
int cnt = 0;
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
for (int p = 0; p < numPartitions; p++)
- cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+ produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
}
-
+
+ /**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+ * @param cluster Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key Kafka key for the record
+ * @param value Kafka value for the record
+ */
+ protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+ cluster.produce(topic, partition, key, value);
+ }
+
+ protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+ MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+ ) throws InterruptedException {
+ AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+ waitForCondition(
+ () -> {
+ Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+ consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+ log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+ return false;
+ }
+ }
+ ret.set(offsets);
+ return true;
+ },
+ CHECKPOINT_DURATION_MS,
+ String.format(
+ "Offsets for consumer group %s not translated from %s for topic %s",
+ consumerGroupName,
+ remoteClusterAlias,
+ topicName
+ )
+ );
+ return ret.get();
+ }
+
/*
* given consumer group, topics and expected number of records, make sure the consumer group
* offsets are eventually synced to the expected offset numbers
*/
- protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
- Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords)
- throws InterruptedException {
+ protected static <T> void waitForConsumerGroupFullSync(
+ EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords
+ ) throws InterruptedException {
+ int expectedRecords = numRecords * topics.size();
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.put("isolation.level", "read_committed");
+ consumerProps.put("auto.offset.reset", "earliest");
+ Map<TopicPartition, Long> lastOffset = new HashMap<>();
+ try (Consumer<byte[], byte[]> consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]))) {
+ final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+ waitForCondition(() -> {
+ ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()));
+ return expectedRecords == totalConsumedRecords.addAndGet(records.count());
+ }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time");
+ }
try (Admin adminClient = connect.kafka().createAdminClient()) {
List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
@@ -717,23 +822,54 @@ public class MirrorConnectorsIntegrationBaseTest {
tps.add(new TopicPartition(topic, partitionIndex));
}
}
- long expectedTotalOffsets = numRecords * topics.size();
waitForCondition(() -> {
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
- adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
- long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
- .mapToLong(OffsetAndMetadata::offset).sum();
-
- Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
- long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
- // make sure the consumer group offsets are synced to expected number
- return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0;
+ adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
+ Map<TopicPartition, OffsetSpec> endOffsetRequest = tps.stream()
+ .collect(Collectors.toMap(Function.identity(), ignored -> OffsetSpec.latest()));
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+ adminClient.listOffsets(endOffsetRequest).all().get();
+
+ for (TopicPartition tp : tps) {
+ assertTrue(consumerGroupOffsets.containsKey(tp),
+ "TopicPartition " + tp + " does not have translated offsets");
+ assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp),
+ "TopicPartition " + tp + " does not have fully-translated offsets");
+ assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+ "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
+ }
+ return true;
}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
}
}
+ protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+ TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
+ try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+ "auto.offset.reset", "earliest"), checkpointTopic)) {
+ Map<String, Map<TopicPartition, Checkpoint>> checkpointsByGroup = new HashMap<>();
+ long deadline = System.currentTimeMillis() + CHECKPOINT_DURATION_MS;
+ do {
+ ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
+ for (ConsumerRecord<byte[], byte[]> record : records) {
+ Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+ Map<TopicPartition, Checkpoint> lastCheckpoints = checkpointsByGroup.computeIfAbsent(
+ checkpoint.consumerGroupId(),
+ ignored -> new HashMap<>());
+ Checkpoint lastCheckpoint = lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint);
+ assertTrue(checkpoint.downstreamOffset() >= lastCheckpoint.downstreamOffset(),
+ "Checkpoint was non-monotonic for "
+ + checkpoint.consumerGroupId()
+ + ": "
+ + checkpoint.topicPartition());
+ lastCheckpoints.put(checkpoint.topicPartition(), checkpoint);
+ }
+ } while (backupConsumer.currentLag(checkpointTopicPartition).orElse(1) > 0 && System.currentTimeMillis() < deadline);
+ assertEquals(0, backupConsumer.currentLag(checkpointTopicPartition).orElse(1), "Unable to read all checkpoints within " + CHECKPOINT_DURATION_MS + "ms");
+ }
+ }
+
/*
* make sure the consumer to consume expected number of records
*/
@@ -756,7 +892,7 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("max.tasks", "10");
mm2Props.put("groups", "consumer-group-.*");
mm2Props.put("sync.topic.acls.enabled", "false");
- mm2Props.put("emit.checkpoints.interval.seconds", "1");
+ mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
mm2Props.put("emit.heartbeats.interval.seconds", "1");
mm2Props.put("refresh.topics.interval.seconds", "1");
mm2Props.put("refresh.groups.interval.seconds", "1");
@@ -767,7 +903,10 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.storage.replication.factor", "1");
mm2Props.put("status.storage.replication.factor", "1");
mm2Props.put("replication.factor", "1");
-
+ // Sync offsets as soon as possible to ensure the final record in a finite test has its offset translated.
+ mm2Props.put("offset.lag.max", "0");
+ mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
return mm2Props;
}
@@ -798,14 +937,14 @@ public class MirrorConnectorsIntegrationBaseTest {
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
protected void warmUpConsumer(Map<String, Object> consumerProps) {
- Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- dummyConsumer.commitSync();
- dummyConsumer.close();
- dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
- dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- dummyConsumer.commitSync();
- dummyConsumer.close();
+ try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ dummyConsumer.commitSync();
+ }
+ try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ dummyConsumer.commitSync();
+ }
}
/*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
new file mode 100644
index 00000000000..6ac09ef32bb
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer,
+ * which interleaves transaction commit messages into the source topic which are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest {
+
+ private Map<String, Object> producerProps = new HashMap<>();
+
+ @BeforeEach
+ @Override
+ public void startClusters() throws Exception {
+ primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
+ backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+ primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+ backupBrokerProps.put("transaction.state.log.min.isr", "1");
+ producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+ producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0");
+ super.startClusters();
+ }
+
+ /**
+ * Produce records with a short-lived transactional producer to interleave transaction markers in the topic.
+ */
+ @Override
+ protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+ ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
+ try (Producer<byte[], byte[]> producer = cluster.createProducer(producerProps)) {
+ producer.initTransactions();
+ producer.beginTransaction();
+ producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+ producer.commitTransaction();
+ } catch (Exception e) {
+ throw new KafkaException("Could not produce message: " + msg, e);
+ }
+ }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 899b42dd877..431ae871ce9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -257,8 +257,16 @@ public class KafkaBasedLog<K, V> {
" allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
" this is your first use of the topic it may have taken too long to create.");
- for (PartitionInfo partition : partitionInfos)
- partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+ for (PartitionInfo partition : partitionInfos) {
+ TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition());
+ if (readPartition(topicPartition)) {
+ partitions.add(topicPartition);
+ }
+ }
+ if (partitions.isEmpty()) {
+ throw new ConnectException("Some partitions for " + topic + " exist, but no partitions matched the " +
+ "required filter.");
+ }
partitionCount = partitions.size();
consumer.assign(partitions);
@@ -392,6 +400,18 @@ public class KafkaBasedLog<K, V> {
return new KafkaConsumer<>(consumerConfigs);
}
+ /**
+ * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once
+ * for every partition found in the log's backing topic.
+ * <p>This method can be overridden by subclasses when only a subset of the assigned partitions
+ * should be read into memory. By default, all partitions are read.
+ * @param topicPartition A topic partition which could be read by this log.
+ * @return true if the partition should be read by this log, false if its contents should be ignored.
+ */
+ protected boolean readPartition(TopicPartition topicPartition) {
+ return true;
+ }
+
private void poll(long timeoutMs) {
try {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index f9defc77ca2..691852430a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -282,8 +282,7 @@ public class TopicAdmin implements AutoCloseable {
this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig));
}
- // visible for testing
- TopicAdmin(Object bootstrapServers, Admin adminClient) {
+ public TopicAdmin(Object bootstrapServers, Admin adminClient) {
this(bootstrapServers, adminClient, true);
}