You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/05/01 16:42:40 UTC
[kafka] 03/03: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 402cce07961dbbe1ac68f2e28497d28e9d4e5edf
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Apr 26 00:30:13 2023 -0700
KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)
Reviewers: Daniel Urban <du...@cloudera.com>, Chris Egerton <ch...@aiven.io>
---
.../kafka/connect/mirror/MirrorCheckpointTask.java | 57 ++++--
.../kafka/connect/mirror/OffsetSyncStore.java | 192 ++++++++++++++++++++-
.../connect/mirror/MirrorCheckpointTaskTest.java | 70 ++++++--
.../kafka/connect/mirror/OffsetSyncStoreTest.java | 156 +++++++++++++++--
.../MirrorConnectorsIntegrationBaseTest.java | 88 +++++++++-
5 files changed, 514 insertions(+), 49 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 72248fd9f6a..150a55abf51 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -42,6 +42,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Collections;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.concurrent.ExecutionException;
import java.time.Duration;
@@ -67,20 +68,21 @@ public class MirrorCheckpointTask extends SourceTask {
private MirrorCheckpointMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
- private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
+ private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
public MirrorCheckpointTask() {}
// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
- Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
+ Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+ this.topicFilter = topic -> true;
}
@Override
@@ -165,9 +167,11 @@ public class MirrorCheckpointTask extends SourceTask {
private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
- List<Checkpoint> checkpoints = checkpointsForGroup(group);
- checkpointsPerConsumerGroup.put(group, checkpoints);
- return checkpoints.stream()
+ Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
+ Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
+ Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
+ oldCheckpoints.putAll(newCheckpoints);
+ return newCheckpoints.values().stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
} catch (ExecutionException e) {
@@ -176,13 +180,44 @@ public class MirrorCheckpointTask extends SourceTask {
}
}
- private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
- return listConsumerGroupOffsets(group).entrySet().stream()
+ // for testing
+ Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets, String group) {
+ return upstreamGroupOffsets.entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately
- .collect(Collectors.toList());
+ .filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
+ .collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
+ }
+
+ private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
+ Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
+ if (checkpoints == null) {
+ log.trace("Emitting {} (first for this group)", checkpoint);
+ return true;
+ }
+ Checkpoint lastCheckpoint = checkpoints.get(checkpoint.topicPartition());
+ if (lastCheckpoint == null) {
+ log.trace("Emitting {} (first for this partition)", checkpoint);
+ return true;
+ }
+ // Emit sync after a rewind of the upstream consumer group takes place (checkpoints can be non-monotonic)
+ if (checkpoint.upstreamOffset() < lastCheckpoint.upstreamOffset()) {
+ log.trace("Emitting {} (upstream offset rewind)", checkpoint);
+ return true;
+ }
+ // Or if the downstream offset is newer (force checkpoints to be monotonic)
+ if (checkpoint.downstreamOffset() > lastCheckpoint.downstreamOffset()) {
+ log.trace("Emitting {} (downstream offset advanced)", checkpoint);
+ return true;
+ }
+ if (checkpoint.downstreamOffset() != lastCheckpoint.downstreamOffset()) {
+ log.trace("Skipping {} (preventing downstream rewind)", checkpoint);
+ } else {
+ log.trace("Skipping {} (repeated checkpoint)", checkpoint);
+ }
+ return false;
}
private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
@@ -199,7 +234,7 @@ public class MirrorCheckpointTask extends SourceTask {
if (offsetAndMetadata != null) {
long upstreamOffset = offsetAndMetadata.offset();
OptionalLong downstreamOffset =
- offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+ offsetSyncStore.translateDownstream(group, topicPartition, upstreamOffset);
if (downstreamOffset.isPresent()) {
return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
@@ -334,10 +369,10 @@ public class MirrorCheckpointTask extends SourceTask {
Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();
- for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
+ for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
- for (Checkpoint checkpoint : entry.getValue()) {
+ for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 0169446aa04..9222d314a43 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -25,17 +25,46 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the in-memory sync storage:
+ * <ul>
+ * <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
+ * <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ * <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
+ * <li>Invariant D: syncs[63] is the earliest offset sync from the syncs topic usable for translation</li>
+ * </ul>
+ * <p>The above invariants ensure that the store is kept updated upon receipt of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows updates to the store in linear time.
+ * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset syncs for the topic were potentially used as
+ * for translation in an earlier generation of the sync store.
+ */
class OffsetSyncStore implements AutoCloseable {
+
+ private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class);
+ // Store one offset sync for each bit of the topic offset.
+ // Visible for testing
+ static final int SYNCS_PER_PARTITION = Long.SIZE;
private final KafkaBasedLog<byte[], byte[]> backingStore;
- private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+ private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean readToEnd = false;
@@ -99,16 +128,21 @@ class OffsetSyncStore implements AutoCloseable {
readToEnd = true;
}
- OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+ OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
+ log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)",
+ group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
- Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
+ Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition, upstreamOffset);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
+ log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})",
+ group, sourceTopicPartition, upstreamOffset,
+ offsetSync.get(), upstreamOffset);
return OptionalLong.of(-1L);
}
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
@@ -124,8 +158,15 @@ class OffsetSyncStore implements AutoCloseable {
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
+ log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
+ group, sourceTopicPartition, upstreamOffset,
+ offsetSync.get().downstreamOffset() + upstreamStep,
+ offsetSync.get()
+ );
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
+ log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
+ group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
}
@@ -139,10 +180,147 @@ class OffsetSyncStore implements AutoCloseable {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
- offsetSyncs.put(sourceTopicPartition, offsetSync);
+ offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+ syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+ );
+ }
+
+ private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+ // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+ // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+ OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+ updateSyncArray(mutableSyncs, offsetSync);
+ if (log.isTraceEnabled()) {
+ log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+ }
+ return mutableSyncs;
+ }
+
+ private String offsetArrayToString(OffsetSync[] syncs) {
+ StringBuilder stateString = new StringBuilder();
+ stateString.append("[");
+ for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+ if (i == 0 || syncs[i] != syncs[i - 1]) {
+ if (i != 0) {
+ stateString.append(",");
+ }
+ // Print only if the sync is interesting, a series of repeated syncs will be elided
+ stateString.append(syncs[i].upstreamOffset());
+ stateString.append(":");
+ stateString.append(syncs[i].downstreamOffset());
+ }
+ }
+ stateString.append("]");
+ return stateString.toString();
+ }
+
+ private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+ OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+ clearSyncArray(syncs, firstSync);
+ return syncs;
+ }
+
+ private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ // If every element of the store is the same, then it satisfies invariants B and C trivially.
+ for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+ syncs[i] = offsetSync;
+ }
+ }
+
+ private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+ long upstreamOffset = offsetSync.upstreamOffset();
+ // While reading to the end of the topic, ensure that our earliest sync is later than
+ // any earlier sync that could have been used for translation, to preserve monotonicity
+ // If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all.
+ if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+ clearSyncArray(syncs, offsetSync);
+ return;
+ }
+ OffsetSync replacement = offsetSync;
+ // The most-recently-discarded offset sync
+ // We track this since it may still be eligible for use in the syncs array at a later index
+ OffsetSync oldValue = syncs[0];
+ // Invariant A is always violated once a new sync appears.
+ // Repair Invariant A: the latest sync must always be updated
+ syncs[0] = replacement;
+ for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+ int previous = current - 1;
+
+ // We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
+ // If oldValue is not recent, it should be expired from the store
+ boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+ // Ensure that this value is sufficiently separated from the previous value
+ // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+ boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
+ // Ensure that this value is sufficiently separated from the next value
+ // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+ int next = current + 1;
+ boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
+ // If this condition is false, oldValue will be expired from the store and lost forever.
+ if (isRecent && separatedFromPrevious && separatedFromNext) {
+ replacement = oldValue;
+ }
+
+ // The replacement variable always contains a value which satisfies the invariants for this index.
+ // This replacement may or may not be used, since the invariants could already be satisfied,
+ // and in that case, prefer to keep the existing tail of the syncs array rather than updating it.
+ assert invariantB(syncs[previous], replacement, previous, current);
+ assert invariantC(syncs[previous], replacement, previous);
+
+ // Test if changes to the previous index affected the invariant for this index
+ if (invariantB(syncs[previous], syncs[current], previous, current)) {
+ // Invariant B holds for syncs[current]: it must also hold for all later values
+ break;
+ } else {
+ // Invariant B violated for syncs[current]: sync is now too old and must be updated
+ // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+ oldValue = syncs[current];
+ syncs[current] = replacement;
+
+ assert invariantB(syncs[previous], syncs[current], previous, current);
+ assert invariantC(syncs[previous], syncs[current], previous);
+ }
+ }
+ }
+
+ private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+ long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+ return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+ }
+
+ private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
+ long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));
+ return iSync == jSync || (bound >= 0 && iSync.upstreamOffset() >= bound);
+ }
+
+ private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long upstreamOffset) {
+ return Optional.ofNullable(offsetSyncs.get(topicPartition))
+ .map(syncs -> lookupLatestSync(syncs, upstreamOffset));
+ }
+
+
+ private OffsetSync lookupLatestSync(OffsetSync[] syncs, long upstreamOffset) {
+ // linear search the syncs, effectively a binary search over the topic offsets
+ // Search from latest to earliest to find the sync that gives the best accuracy
+ for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+ OffsetSync offsetSync = syncs[i];
+ if (offsetSync.upstreamOffset() <= upstreamOffset) {
+ return offsetSync;
+ }
+ }
+ return syncs[SYNCS_PER_PARTITION - 1];
}
- private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) {
- return Optional.ofNullable(offsetSyncs.get(topicPartition));
+ // For testing
+ OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
+ OffsetSync[] syncs = offsetSyncs.get(topicPartition);
+ if (syncs == null)
+ throw new IllegalArgumentException("No syncs present for " + topicPartition);
+ if (syncIdx >= syncs.length)
+ throw new IllegalArgumentException(
+ "Requested sync " + (syncIdx + 1) + " for " + topicPartition
+ + " but there are only " + syncs.length + " syncs available for that topic partition"
+ );
+ return syncs[syncIdx];
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 500cb6c131a..f9bc7bc76cb 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -16,12 +16,11 @@
*/
package org.apache.kafka.connect.mirror;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Optional;
+import java.util.OptionalLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -31,6 +30,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MirrorCheckpointTaskTest {
@@ -118,7 +118,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testSyncOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
- Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+ Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
String consumer1 = "consumer1";
String consumer2 = "consumer2";
@@ -147,16 +147,16 @@ public class MirrorCheckpointTaskTest {
// 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");
- // 'checkpointListC1' denotes 'checkpoint' list for consumer1
- List<Checkpoint> checkpointListC1 = new ArrayList<>();
- checkpointListC1.add(cpC1T1P0);
+ // 'checkpointMapC1' denotes 'checkpoint' map for consumer1
+ Map<TopicPartition, Checkpoint> checkpointMapC1 = new HashMap<>();
+ checkpointMapC1.put(cpC1T1P0.topicPartition(), cpC1T1P0);
- // 'checkpointListC2' denotes 'checkpoint' list for consumer2
- List<Checkpoint> checkpointListC2 = new ArrayList<>();
- checkpointListC2.add(cpC2T2P0);
+ // 'checkpointMapC2' denotes 'checkpoint' map for consumer2
+ Map<TopicPartition, Checkpoint> checkpointMapC2 = new HashMap<>();
+ checkpointMapC2.put(cpC2T2P0.topicPartition(), cpC2T2P0);
- checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
- checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);
+ checkpointsPerConsumerGroup.put(consumer1, checkpointMapC1);
+ checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2);
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
@@ -195,4 +195,52 @@ public class MirrorCheckpointTaskTest {
Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
assertFalse(checkpoint.isPresent());
}
+
+ @Test
+ public void testCheckpointRecordsMonotonicIfStoreRewinds() {
+ OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ offsetSyncStore.start();
+ Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), checkpointsPerConsumerGroup);
+ TopicPartition tp = new TopicPartition("topic1", 0);
+ TopicPartition targetTP = new TopicPartition("source1.topic1", 0);
+
+ long upstream = 11L;
+ long downstream = 4L;
+ // Emit syncs 0 and 1, and use the sync 1 to translate offsets and commit checkpoints
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ long consumerGroupOffset = upstream;
+ long expectedDownstreamOffset = downstream;
+ assertEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+ Map<TopicPartition, Checkpoint> checkpoints = assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, true);
+
+ // the task normally does this, but simulate it here
+ checkpointsPerConsumerGroup.put("g1", checkpoints);
+
+ // Emit syncs 2-6 which will cause the store to drop sync 1, forcing translation to fall back to 0.
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ offsetSyncStore.sync(tp, upstream++, downstream++);
+ // The OffsetSyncStore will change its translation of the same offset
+ assertNotEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+ // But the task will filter this out and not emit a checkpoint
+ assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, false);
+
+ // If then the upstream offset rewinds in the topic and is still translatable, a checkpoint will be emitted
+ // also rewinding the downstream offsets to match. This will not affect auto-synced groups, only checkpoints.
+ assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset - 1, true);
+ }
+
+ private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(
+ MirrorCheckpointTask task, TopicPartition tp, TopicPartition remoteTp, long consumerGroupOffset, boolean truth
+ ) {
+ Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = Collections.singletonMap(tp, new OffsetAndMetadata(consumerGroupOffset));
+ Map<TopicPartition, Checkpoint> checkpoints = task.checkpointsForGroup(consumerGroupOffsets, "g1");
+ assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync");
+ return checkpoints;
+ }
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 163e5b72250..b2fa9fa4da3 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class OffsetSyncStoreTest {
@@ -57,22 +58,22 @@ public class OffsetSyncStoreTest {
// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
- assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
// Translate exact offsets
store.sync(tp, 150, 251);
- assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+ assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
// Use old offset (5) prior to any sync -> can't translate
- assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
// Downstream offsets reset
store.sync(tp, 200, 10);
- assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+ assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
// Upstream offsets reset
store.sync(tp, 20, 20);
- assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+ assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
}
}
@@ -80,21 +81,21 @@ public class OffsetSyncStoreTest {
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// read a sync during startup
store.sync(tp, 100, 200);
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// After the store is started all offsets are visible
store.start();
- assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
- assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
- assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+ assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+ assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
}
}
@@ -102,7 +103,132 @@ public class OffsetSyncStoreTest {
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
- assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+ assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+ }
+ }
+
+ @Test
+ public void testPastOffsetTranslation() {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ long maxOffsetLag = 10;
+ int offset = 0;
+ for (; offset <= 1000; offset += maxOffsetLag) {
+ store.sync(tp, offset, offset);
+ assertSparseSyncInvariant(store, tp);
+ }
+ store.start();
+
+ // After starting but before seeing new offsets, only the latest startup offset can be translated
+ assertSparseSync(store, 1000, -1);
+
+ for (; offset <= 10000; offset += maxOffsetLag) {
+ store.sync(tp, offset, offset);
+ assertSparseSyncInvariant(store, tp);
+ }
+
+ // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+ // Invariant D: the last sync from the initial read-to-end is still stored
+ assertSparseSync(store, 1000, -1);
+
+ // We can translate offsets between the latest startup offset and the latest offset with variable precision
+ // Older offsets are less precise and translation ends up farther apart
+ assertSparseSync(store, 4840, 1000);
+ assertSparseSync(store, 6760, 4840);
+ assertSparseSync(store, 8680, 6760);
+ assertSparseSync(store, 9160, 8680);
+ assertSparseSync(store, 9640, 9160);
+ assertSparseSync(store, 9880, 9640);
+ assertSparseSync(store, 9940, 9880);
+ assertSparseSync(store, 9970, 9940);
+ assertSparseSync(store, 9990, 9970);
+ assertSparseSync(store, 10000, 9990);
+
+ // Rewinding upstream offsets should clear all historical syncs
+ store.sync(tp, 1500, 11000);
+ assertSparseSyncInvariant(store, tp);
+ assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+ assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+ assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+ }
+ }
+
+ @Test
+ public void testKeepMostDistinctSyncs() {
+ // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
+ // which provide the best translation accuracy (expires as few syncs as possible)
+ // Each new sync should be added to the cache and expire at most one other sync from the cache
+ long iterations = 10000;
+ long maxStep = Long.MAX_VALUE / iterations;
+ // Test a variety of steps (corresponding to the offset.lag.max configuration)
+ for (long step = 1; step < maxStep; step = (step * 2) + 1) {
+ for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
+ try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+ int lastCount = 1;
+ store.start();
+ for (long offset = firstOffset; offset <= iterations; offset += step) {
+ store.sync(tp, offset, offset);
+ // Invariant A: the latest sync is present
+ assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+ // Invariant D: the earliest sync is present
+ assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
+ int count = countDistinctStoredSyncs(store, tp);
+ int diff = count - lastCount;
+ assertTrue(diff >= 0,
+ "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+ lastCount = count;
+ }
+ }
+ }
+ }
+ }
+
+ private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+ assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+ assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+ assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+ assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+ }
+
+ private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+ int count = 1;
+ for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+ if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+ for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+ for (int i = 0; i < j; i++) {
+ long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+ long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+ if (jUpstream == iUpstream) {
+ continue;
+ }
+ int exponent = Math.max(i - 2, 0);
+ long iUpstreamLowerBound = jUpstream + (1L << exponent);
+ if (iUpstreamLowerBound < 0) {
+ continue;
+ }
+ assertTrue(
+ iUpstream >= iUpstreamLowerBound,
+ "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+ + " should be at least " + iUpstreamLowerBound
+ + " (" + jUpstream + " + 2^" + exponent + ")"
+ );
+ long iUpstreamUpperBound = jUpstream + (1L << j) - (1L << i);
+ if (iUpstreamUpperBound < 0)
+ continue;
+ assertTrue(
+ iUpstream <= iUpstreamUpperBound,
+ "Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+ + " should be no greater than " + iUpstreamUpperBound
+ + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
+
+ );
+ }
}
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27579d3275f..7c860049318 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -509,7 +509,6 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
@@ -655,6 +654,71 @@ public class MirrorConnectorsIntegrationBaseTest {
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
+ @Test
+ public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
+ String consumerGroupName = "consumer-group-lagging-behind";
+ Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+ String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+ warmUpConsumer(consumerProps);
+ mm2Props.put("sync.group.offsets.enabled", "true");
+ mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
+ mm2Config = new MirrorMakerConfig(mm2Props);
+ waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+ // Produce a large number of records to the topic, all replicated within one MM2 lifetime.
+ int iterations = 100;
+ for (int i = 0; i < iterations; i++) {
+ produceMessages(primary, "test-topic-1");
+ }
+ waitForTopicCreated(backup, remoteTopic);
+ assertEquals(iterations * NUM_RECORDS_PRODUCED, backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
+ "Records were not replicated to backup cluster.");
+ // Once the replication has finished, we spin up the upstream consumer and start slowly consuming records
+ ConsumerRecords<byte[], byte[]> allRecords = primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_CONSUME_DURATION_MS, "test-topic-1");
+ MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+ Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = waitForCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic);
+ Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
+ log.info("Initial checkpoints: {}", initialCheckpoints);
+ try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitSync(partialOffsets(allRecords, 0.9f));
+ partialCheckpoints = waitForNewCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
+ log.info("Partial checkpoints: {}", partialCheckpoints);
+ }
+
+ for (TopicPartition tp : initialCheckpoints.keySet()) {
+ assertTrue(initialCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+ "Checkpoints should advance when the upstream consumer group advances");
+ }
+
+ assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
+ Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
+ try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitSync(partialOffsets(allRecords, 0.1f));
+ finalCheckpoints = waitForNewCheckpointOnAllPartitions(
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
+ log.info("Final checkpoints: {}", finalCheckpoints);
+ }
+
+ for (TopicPartition tp : partialCheckpoints.keySet()) {
+ assertTrue(finalCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+ "Checkpoints should rewind when the upstream consumer group rewinds");
+ }
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> allRecords, double fraction) {
+ return allRecords.partitions()
+ .stream()
+ .collect(Collectors.toMap(Function.identity(), partition -> {
+ List<ConsumerRecord<byte[], byte[]>> records = allRecords.records(partition);
+ int index = (int) (records.size() * fraction);
+ return new OffsetAndMetadata(records.get(index).offset());
+ }));
+ }
private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
@@ -798,6 +862,13 @@ public class MirrorConnectorsIntegrationBaseTest {
private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+ ) throws InterruptedException {
+ return waitForNewCheckpointOnAllPartitions(client, consumerGroupName, remoteClusterAlias, topicName, Collections.emptyMap());
+ }
+
+ protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(
+ MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName,
+ Map<TopicPartition, OffsetAndMetadata> lastCheckpoint
) throws InterruptedException {
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
waitForCondition(
@@ -805,9 +876,13 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
for (int i = 0; i < NUM_PARTITIONS; i++) {
- if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+ TopicPartition tp = new TopicPartition(topicName, i);
+ if (!offsets.containsKey(tp)) {
log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
return false;
+ } else if (lastCheckpoint.containsKey(tp) && lastCheckpoint.get(tp).equals(offsets.get(tp))) {
+ log.info("Checkpoint is the same as previous checkpoint");
+ return false;
}
}
ret.set(offsets);
@@ -867,9 +942,12 @@ public class MirrorConnectorsIntegrationBaseTest {
for (TopicPartition tp : tps) {
assertTrue(consumerGroupOffsets.containsKey(tp),
"TopicPartition " + tp + " does not have translated offsets");
- assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp) - offsetLagMax,
- "TopicPartition " + tp + " does not have fully-translated offsets");
- assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+ long offset = consumerGroupOffsets.get(tp).offset();
+ assertTrue(offset > lastOffset.get(tp) - offsetLagMax,
+ "TopicPartition " + tp + " does not have fully-translated offsets: "
+ + offset + " is not close enough to " + lastOffset.get(tp)
+ + " (strictly more than " + (lastOffset.get(tp) - offsetLagMax) + ")");
+ assertTrue(offset <= endOffsets.get(tp).offset(),
"TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
}
return true;