You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/01/31 19:03:31 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag

gharris1727 opened a new pull request, #13178:
URL: https://github.com/apache/kafka/pull/13178

   This PR addresses three distinct but closely related issues:
   
   1. [KAFKA-12468](https://issues.apache.org/jira/browse/KAFKA-12468) "Initial offsets are copied from source to target cluster" "Mirror Maker 2 Negative Offsets"
   2. [KAFKA-14663](https://issues.apache.org/jira/browse/KAFKA-14663) "High throughput topics can starve low-throughput MM2 offset syncs"
   3. [KAFKA-12566](https://issues.apache.org/jira/browse/KAFKA-12566) "Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication"
   
   The primary issue being addressed here is the incorrect translation of offsets, the title issue.
   
   The [MM2 KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0) does not discuss the offset translation mechanism in detail, so I'll summarize the mechanism as it currently exists on trunk:
   1. Records are mirrored from source topic-partition to target topic-partition by the MirrorSourceTask
   2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an Offset Syncs topic. Offset syncs contain the upstream and downstream offset of an emitted data record.
   3. The MirrorCheckpointTask will consume from the offset syncs topic, and maintain an in-memory copy of the latest offset sync seen for each topic-partition (in OffsetSyncStore)
   4. Periodically the MirrorCheckpointTask will poll consumer group offsets for the source topic, and use it's in-memory copy of the latest offset sync to translate upstream offsets to downstream offsets.
   5. This is done by measuring the 'distance' between the MM2 offset sync and the upstream consumer group, and then assuming that the same distance applies in the downstream topic.
   
   Step (5) is correct when assuming that every *offset* from the source topic has already been reproduced in the downstream topic. However, this assumption is violated when offsets are not present, which can happen for a variety of reasons, including:
   1. Transaction markers take an offset but will never be emitted as records from the consumer
   2. Records are dropped by SMTs and will never be emitted to the target topic
   3. The source topic has been compacted and some offsets will never be emitted by the consumer
   4. MM2 replication is lagging behind an upstream consumer group and some records have not been replicated yet
   
   In any of these conditions, an upstream offset may be translated to a downstream offset which is beyond the corresponding record in the downstream topic. Consider the following concrete example of situation (4) **resulting in negative lag**:
   1. Source topic `A` has 1000 records, all with contiguous offsets
   2. An upstream consumer group `cg` is at the end of the log, offset 1000.
   3. MM2 begins replicating the topic, and writes 500 upstream records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 500).
   4. MM2 checkpoint reads `cg` offset 1000, translates the offset to 500 + (1000-500) = 1000, and writes to `target.cg`
   5. Someone checks the `target.cg` offset for `target.A` and observes that the group offset is 1000, the topic end offset is 500, and the lag is -500.
   
   And the following concrete example of situation (1) **resulting in undelivered data**.
   1. Source topic `A` has 1000 records, all emitted with a transactional producer.
   2. The 1000 records are interleaved with 1000 commit markers at every other offset.
   3. An upstream consumer group `cg` is in the middle of the topic, at offset 1000.
   4. MM2 begins replicating the topic, and writes 1000 records to the target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with (`target.A`, 250), in addition to other offset-syncs.
   5. MM2 checkpoint reads the `cg` offset 1000, translates the offset to 250 + (1000 - 500) = 750, and writes to `target.cg`
   6. A system fails-over from `cg` to `target.cg` and someone notices that the `cg` application read records 0-500, `target.cg` application read 750-1000, but no consumer ever received offsets 500-750.
   
   This PR adds a test that replicates transactional data, as in situation (1). It asserts that whenever an offset is translated, it does not pass the end of the downstream topic, and cannot cause negative lag. In addition the tests are strengthened to require the offset syncs to be emitted up to the end of the topic, requiring a fix for the offset-syncs topic starvation issue. This also exposed a number of mistakes and flakiness in the existing tests, so this PR also stabilizes the tests to make them useful for validating the negative offsets fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423233290

   > Oh, and the Jenkins build seems to be consistently failing on the IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored test case. Probably want to look into that before we merge 😄
   
   It looks like this was failing due to a typo in my offset.flush.interval.ms overrides. This should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107352681


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation
+    ) throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
-            List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+            Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS * topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
                 for (String topic : topics) {
-                    tps.add(new TopicPartition(topic, partitionIndex));
+                    tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest());
                 }
             }
             long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
                     adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
+                long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+                        adminClient.listOffsets(tps).all().get();
+                long totalEndOffsets = endOffsets.values().stream()
+                        .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+                for (TopicPartition tp : endOffsets.keySet()) {
+                    if (consumerGroupOffsets.containsKey(tp)) {
+                        assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                                "Consumer group committed downstream offsets beyond the log end, this would lead to negative lag metrics"
+                        );
+                    }
+                }
                 boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
+                        ? totalEndOffsets == expectedTotalOffsets
+                        : totalEndOffsets >= expectedTotalOffsets;
+
+                boolean consumerGroupOffsetsMatch = exactOffsetTranslation
+                        ? totalConsumerGroupOffsets == expectedTotalOffsets
+                        : totalConsumerGroupOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetsMatch;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
 
+    protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster cluster, String offsetSyncsTopic, String topic, int partitions) throws InterruptedException {
+        waitForTopicCreated(cluster, offsetSyncsTopic);
+        // Insert a large number of checkpoint records into the offset syncs topic to simulate
+        // a long-lived MM2 instance that has replicated many offsets in the past.
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            TopicPartition tp = new TopicPartition(topic, partition);
+            OffsetSync sync = new OffsetSync(tp, 0L, 0L);
+            records.add(new ProducerRecord<>(offsetSyncsTopic, sync.recordKey(), sync.recordValue()));
+        }
+        Map<String, Object> producerProps = new HashMap<>();
+        int sentRecords = 0;
+        try (KafkaProducer<byte[], byte[]> producer = cluster.kafka().createProducer(producerProps)) {
+            // Try to ensure that the contents of the offset syncs topic is too large to read before the checkpoint
+            // interval passes, so that the first checkpoint would take place before reading the whole contents of the
+            // sync topic. Experimentally, this test passes with <2x, and fails with >5x, without a fix for KAFKA-13659.
+            double consumeEfficiency = 10;
+            long deadline = System.currentTimeMillis() + (long) (consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS);
+            int nRecords = records.size();
+            while (System.currentTimeMillis() < deadline) {
+                producer.send(records.get(sentRecords % nRecords));
+                sentRecords++;
+            }
+            producer.flush();
+        }
+        log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic);
+    }

Review Comment:
   This is interesting, but seems a little too prone to flakiness to be worth the benefits it provides.
   
   IMO we can remove this method (and possibly the entire `testRestartReplication` test case) in favor of adding unit testing coverage for the `MirrorCheckpointTask` and/or `OffsetSyncStore` classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107647929


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   I added a read-to-end-and-commit-offsets that makes this topic able to be checkpointed, and reverts this change to the wait condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1425058794

   > Out of an abundance of caution, what do you think about targeting your mm2-negative-offsets branch with a new PR to address [KAFKA-13659](https://issues.apache.org/jira/browse/KAFKA-13659), which can be reviewed separately but then merged to trunk in tandem with this PR?
   
   I tried to do this, but it appears that the "stacked PRs" feature for github only applies for branches on the same repository, not across forks. If i were to open a stacked PR, it would be on my fork, and review comments may be lost if I accidentally delete the repository.
   
   In the interest of an atomic merge, and preserving comment history, I've added the commits here. To make it easier on your review, I will continue to tag the relevant commits with KAFKA-13659, so you can filter during your review. I hope this is acceptable, but sorry in advance for the difficult review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1433443867

   I've split the periodic commit fix into a separate PR (https://github.com/apache/kafka/pull/13262) as it is not related to the changes here, only the test changes. This PR depends on that change landing first, and I'll update this branch once the other PR is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107273995


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   Ah yeah, much cleaner. Thanks! 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gkousouris (via GitHub)" <gi...@apache.org>.
gkousouris commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726667976

   Thanks a lot for your reply! I will look into creating a Jira account and creating a ticket for this. 
   
   I should have mentioned that for we were planning on using Mirror Maker to migrate the topic a service reads from from cluster A to cluster B. So we would not be limited by the asynchronous offset translation that MirrorMaker uses, since we would be:
   
   1. Mirroring data from old topic in cluster A to new topic in cluster B. 
   2. Stopping the service and waiting for the consumer offset of the last message on cluster A to be replicated on the new topic on cluster.
   3. Restart the service to read from the new topic. 
   
   We would have hoped that the offset would be translated exactly at some point, and would let us to seamlessly start consuming from the same point it was last stopped. 
   
   MirrorMaker seems like a great use case for us, but this might be a bit of a blocker. Using the old offset translation version before this PR could perhaps work if we were to disable EOS (to get rid of the transactional messages). 
   
   Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle.
   
   Would you perhaps recommend a different approach to not re-process a message twice ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726690138

   @gkousouris Thanks for sharing your use-case. I think you are right to look towards MM2 for this sort of translation, and I think it's unfortunate that it isn't straightforward. The current offset translation doesn't "converge" for consumer groups which are inactive due to memory limitations, but for a single-shot migration use-case, that's not good enough.
   
   Are you able to stop the producers to the upstream topic, and let the consumers commit offsets at the end of the topic before performing the migration? If you set offset.lag.max very low, MM2 should be able to translate offsets at the end of the topic.
   
   > Otherwise, the only solution I can think of is the hacky approach of reading the offsets and trying to decipher what message to read on the application-side, which seems brittle.
   
   Yeah, if you want to get a 100% precise translation away from the end of the topic and don't want to modify MM2, you're going to need to "synchronize" the two topics and figure out which messages line up. Between offset.lag.max, the syncs topic throttle semaphore, and the OffsetSyncStore, a lot of intermediate offset syncs get discarded and the precision of the translation decreases significantly. If you let MirrorMaker2 perform a rough translation that you later refine with a custom script, you probably only need to compare a few hundred record checksums for each topic-partition-consumer-group. This would also allow you to compensate for the skipped offsets that EOS mode produces.
   
   I think you could make such a script reliable enough for a one-off migration, with some manual spot-checking to make sure it doesn't do anything too incorrect.
   
   If you're willing to hack on the MirrorMaker connectors, you could disable the throttling semaphore, the offset.lag.max parameter, and implement the full-depth OffsetSyncStore to get perfect translation. I don't think we could add those to mainline MM2 without a configuration, but you are certainly welcome to temporarily fork MM2 to get the job done.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107275404


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) {

Review Comment:
   Ah yeah, totally right, the condition was correct. Sorry about that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107634292


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -30,7 +30,7 @@ public class OffsetSyncStoreTest {
     static class FakeOffsetSyncStore extends OffsetSyncStore {
 
         FakeOffsetSyncStore() {
-            super(null, null);
+            super();

Review Comment:
   1. I've added a null-check to the real start() method which allows us to call the real start method on the fake offset store.
   2. Added
   3. I removed the assertion messages as they were just visual noise, the comments seemed easy enough to interpret.
   4. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726637742

   Hi @gkousouris Thanks for asking!
   
   > Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from last_checkpoint_before_upstream_offset + 1 until downstream_offset. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ?
   
   Your understanding of the offset translation (post-KAFKA-12468) is correct, and I would expect re-processing of messages downstream after a fail-over. I also understand that this doesn't satisfy "exactly once semantics" for some definition, because it allows for re-delivery of the same message to the same "application", when that application uses multiple Kafka clusters.
   
   MirrorMaker2 currently provides "exactly once semantics" for replicating data, but not for offsets. I believe this is captured by the "MirrorSourceConnector" declaring that it supports EOS, but the"MirrorCheckpointConnector" does not. This means that when you replicate a topic with EOS mode, and use read_committed on the downstream topic from the beginning, EOS would mean that you read each record present in the upstream topic exactly once. When you instead start reading at the committed downstream offset, you may have records delivered downstream that have already been committed upstream.
   
   This is not just caused by the offset translation that this PR implements, it's a limitation of the asynchronous offset translation that MirrorMaker2 uses. Consider this sequence:
   1. MirrorCheckpointTask syncs offsets 
   2. MirrorCheckpointTask sleeps
   3. MirrorSourceTask replicates some records
   4. The upstream consumer group consumes upstream records and commits offsets to the upstream group
   5. The downstream consumer group starts reading the topic with the stale offsets in the downstream group
   
   Thanks for doing your due diligence on the claims of "exactly once semantics", and I hope that you can still make MirrorMaker2 work for your use-case. I suspect that EOS semantics across multiple Kafka clusters is a much larger effort than just changing the offset translation logic :) If you have a Jira account, please consider opening a ticket about this shortcoming.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100680887


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   I think that makes sense.
   I'll add some explicitly committed offsets here as a quick fix, and re-add the assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1422939422

   Oh, and the Jenkins build seems to be consistently failing on the `IdentityReplicationIntegrationTest::testNoCheckpointsIfNoRecordsAreMirrored` test case. Probably want to look into that before we merge 😄


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1105013739


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);

Review Comment:
   I addressed this with readPartition method which tests each partition individually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1104664513


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");

Review Comment:
   If the `KafkaBasedLog` constructor for `store` fails, won't `store` be null, causing `consumer` to be leaked?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow
         }
     }
 
     public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+        Utils.closeQuietly(backingStore::stop, "offset sync store kafka based log");

Review Comment:
   Possible NPE:
   ```suggestion
           Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "offset sync store kafka based log");
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");

Review Comment:
   Also, could we use more descriptive names for these objects? For example, this one could be "backing store for offset syncs".



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());

Review Comment:
   Nit: we have a utility method for this
   ```suggestion
           Consumer<byte[], byte[]> consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);

Review Comment:
   This leads to a change in behavior since we'll end up consuming from all partitions in the offset syncs topic instead of just partition 0.
   
   We intentionally [write every offset sync to partition zero](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L249) and [create the topic with a single partition](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L361), but the topic may have been created out-of-band and there may be other information in it which has not been produced by MM2 that we shouldn't consume.
   
   Could we expand the `KafkaBasedLog` API to support reading from a specific subset of the partitions for a topic, possibly by adding a `protected List<TopicPartitions> assignedPartitions(List<PartitionInfo> partitionInfos)` method that can be overridden by subclasses. This would allow us to completely preserve the existing behavior.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow

Review Comment:
   Won't this cause us to emit stale offset syncs in `MirrorCheckpointTask::poll` if we encounter some types of failure here?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow

Review Comment:
   I haven't looked too closely into this, but one possibility could be to return `Optional.empty()` from `OffsetSyncStore::translateDownstream` if we haven't completed a read to the end of the offset syncs topic yet?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");
+            Utils.closeQuietly(new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())), "admin client");
+            throw t;
+        }
+        this.backingStore = store;
+    }
+
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    public void readToEnd(Runnable callback) {
+        backingStore.readToEnd((error, result) -> callback.run());
     }
 
-    OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+    synchronized OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {

Review Comment:
   Why are we synchronizing here? Would it be enough to change `offsetSyncs` to be a `ConcurrentMap`?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();

Review Comment:
   This will trigger a [synchronous read to the end of the topic](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L269).
   
   Perhaps we could introduce a separate method to "start" the store that invokes `start` on the underlying `KafkaBasedLog`? This method could be run on the `MirrorCheckpointTask`'s `scheduler` so as not to block startup.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        String topic = config.offsetSyncsTopic();
+        Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(
+                config.offsetSyncsTopicConsumerConfig(),
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer());
+        this.admin = new TopicAdmin(
+                config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+        KafkaBasedLog<byte[], byte[]> store = null;
+        try {
+            store = KafkaBasedLog.withExistingClients(
+                    topic,
+                    consumer,
+                    null,
+                    new TopicAdmin(
+                            config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                            config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())),
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    });
+            store.start();
+        } catch (Throwable t) {
+            Utils.closeQuietly(store != null ? store::stop : null, "backing store");
+            Utils.closeQuietly(new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())), "admin client");

Review Comment:
   Should this be:
   ```suggestion
               Utils.closeQuietly(admin, "admin client");
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster, topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.kafka().produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForAnyCheckpoint(

Review Comment:
   It's `any` in contrast to the other method's `FullSync`.
   As in, it accepts checkpoints anywhere within a topic, not just at the end of the topic.
   
   I'll update the method name to be more precise and verbose :)



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         // have been automatically synchronized from primary to backup by the background job, so no
         // more records to consume from the replicated topic by the same consumer group at backup cluster
         assertEquals(0, records.count(), "consumer record size is not zero");
+        backupConsumer.close();

Review Comment:
   I code-golfed this one, but there's no reason to. Updated.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   1. This was passing before, because `offset.max.lag` was nonzero, leaving a (0,0) offset sync in the topic.
   2. This condition is part of `waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in `testReplication`. I think that test should be sufficient to cover the normal case where offsets are translated.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   This assertion relied on a nonzero `offset.lag.max`, where the sync topic could be left with a (0,0) record.
   The offset being translated in this case is the `consumer-group-dummy` from the setup method, which has offset 0 (at the beginning of the topic). Now that `offset.lag.max` is zero and the starvation bug is fixed, the sync topic may only see nonzero syncs, which are not able to translate 0 offsets.
   
   I removed this assertion because it was failing and didn't seem to concern this test.
   Upon further inspection, maybe this test makes sense to turn into a parameter for other tests, to verify that the functionality of the checkpoints when the syncs topic is moved around. Does that make sense to do in this PR?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   This is because the `waitForConsumerGroupOffsetSync` condition was much weaker, and only required that the total consumer group offsets across all topic-partitions was nonzero.
   The wait condition passed because the `test-topic-1` had committed offsets, and made the total consumer group offsets nonzero, even if all of the `test-topic-2` group offsets were 0.
   So it was sufficient for the first topic to have _any_ committed offsets, and for both topics to be fully replicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100622077


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
-            return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   Let me know if this description makes sense to you, and if the diagram adds any value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1433313709

   Hmmm... we make an effort to periodically invoke `SourceTask::commit` in non-EOS mode, even if there have been no new records written and there are no offsets to commit. I think we should do the same in EOS mode, instead of adjusting MM2 to accommodate this unexpected change in behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423176443

   > I notice that we don't do a read-to-end of the offset syncs topic in MirrorCheckpointTask before we begin syncing consumer group offsets, and we begin reading that topic from the beginning. This may cause us to sync offsets based on stale checkpoints if there are later checkpoints available in the topic that we haven't consumed yet. Do you think it might make sense to add a read-to-end for the offset syncs topic before we begin syncing consumer group offsets in the checkpoint connector?
   
   Ouch, yeah that is certainly an issue that gets worse with my change.
   Before the checkpoints would be non-monotonic for transactional/compacted topics, and after it's non-monotonic for everyone.
   I think addressing this in a follow-up is a smart idea, this change is already messy enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1104643146


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,76 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collections;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
     private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final TopicAdmin admin;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);

Review Comment:
   This leads to a change in behavior since we'll end up consuming from all partitions in the offset syncs topic instead of just partition 0.
   
   We intentionally [write every offset sync to partition zero](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L249) and [create the topic with a single partition](https://github.com/apache/kafka/blob/8cfafba2794562840b0f1c537e304f084b9359cf/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L361), but the topic may have been created out-of-band and there may be other information in it which has not been produced by MM2 that we shouldn't consume.
   
   Could we expand the `KafkaBasedLog` API to support reading from a specific subset of the partitions for a topic, possibly by adding a `protected List<TopicPartitions> assignedPartitions(List<PartitionInfo> partitionInfos)` method that can be overridden by subclasses? This would allow us to completely preserve the existing behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427


##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -392,6 +400,17 @@ protected Consumer<K, V> createConsumer() {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    /**
+     * Test whether a topic partition should be read by this log.
+     * <p>Overridden by subclasses when only a subset of the assigned partitions should be read into memory.
+     * By default, this will read all partitions.

Review Comment:
   Some nits:
   
   ```suggestion
        * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once
        * for every partition found in the log's backing topic.
        * <p>This method can be overridden by subclasses when only a subset of the assigned partitions
        * should be read into memory. By default, all partitions are read.
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation
+    ) throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
-            List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+            Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS * topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
                 for (String topic : topics) {
-                    tps.add(new TopicPartition(topic, partitionIndex));
+                    tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest());
                 }
             }
             long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
                     adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
+                long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+                        adminClient.listOffsets(tps).all().get();
+                long totalEndOffsets = endOffsets.values().stream()
+                        .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+                for (TopicPartition tp : endOffsets.keySet()) {
+                    if (consumerGroupOffsets.containsKey(tp)) {
+                        assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                                "Consumer group committed downstream offsets beyond the log end, this would lead to negative lag metrics"
+                        );
+                    }
+                }
                 boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
+                        ? totalEndOffsets == expectedTotalOffsets
+                        : totalEndOffsets >= expectedTotalOffsets;
+
+                boolean consumerGroupOffsetsMatch = exactOffsetTranslation
+                        ? totalConsumerGroupOffsets == expectedTotalOffsets
+                        : totalConsumerGroupOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetsMatch;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
 
+    protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster cluster, String offsetSyncsTopic, String topic, int partitions) throws InterruptedException {
+        waitForTopicCreated(cluster, offsetSyncsTopic);
+        // Insert a large number of checkpoint records into the offset syncs topic to simulate
+        // a long-lived MM2 instance that has replicated many offsets in the past.
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            TopicPartition tp = new TopicPartition(topic, partition);
+            OffsetSync sync = new OffsetSync(tp, 0L, 0L);
+            records.add(new ProducerRecord<>(offsetSyncsTopic, sync.recordKey(), sync.recordValue()));
+        }
+        Map<String, Object> producerProps = new HashMap<>();
+        int sentRecords = 0;
+        try (KafkaProducer<byte[], byte[]> producer = cluster.kafka().createProducer(producerProps)) {
+            // Try to ensure that the contents of the offset syncs topic is too large to read before the checkpoint
+            // interval passes, so that the first checkpoint would take place before reading the whole contents of the
+            // sync topic. Experimentally, this test passes with <2x, and fails with >5x, without a fix for KAFKA-13659.
+            double consumeEfficiency = 10;
+            long deadline = System.currentTimeMillis() + (long) (consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS);
+            int nRecords = records.size();
+            while (System.currentTimeMillis() < deadline) {
+                producer.send(records.get(sentRecords % nRecords));
+                sentRecords++;
+            }
+            producer.flush();
+        }
+        log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic);
+    }
+
+    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {

Review Comment:
   I really like this method. Nice 👍



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -30,7 +30,7 @@ public class OffsetSyncStoreTest {
     static class FakeOffsetSyncStore extends OffsetSyncStore {
 
         FakeOffsetSyncStore() {
-            super(null, null);
+            super();

Review Comment:
   A few comments:
   
   1. Some cases in this test suite and the `MirrorCheckpointTaskTest` suite are failing because the store isn't started. We might upgrade the visibility of the `OffsetSyncStore.readToEnd` field to `protected`, and expose a method in this class to set the value for that field.
   2. Can we get some additional coverage in either/both of the above-mentioned test suites to make sure that a store that hasn't been started yet, or that takes a while to start, results in the expected lack of checkpoints (or translated downstream offsets)?
   3. The assertion messages for this test case need to be updated to match the values they're referring to (or, if you prefer, we can just remove specific values from the messages altogether since they can be easily deduced from the source code and line number if an assertion fails)
   4. For the `testOffsetTranslation` case, we might consider performing comparisons between two `Optional` instances instead of unconditionally invoking `getAsLong` on the translated downstream offset. For example, `assertEquals(OptionalLong.of(201L), store.translateDownstream(tp, 150), "Failed to translate downstream offsets");`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1106023715


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) {

Review Comment:
   Also, we changed the contract for `SourceTask::stop` a while ago to not be invoked on a separate thread from the one that invokes `SourceTask::poll` in order to fix KAFKA-10792, so I'm not sure how much value changing this to a `CountDownLatch` really adds here.
   
   Unless there's something else I'm missing here that necessitates this change, I think it's fine to leave `stopping` as a boolean field.
   
   As a follow-up, we could potentially tweak the logic here to pause for, e.g., a second and then return a `null` record batch from `SourceTask::poll` if the poll timeout hasn't elapsed since the last time we returned a non-null batch. But that should not happen in this PR, we're doing plenty here already.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -75,18 +123,9 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
         }
     }
 
-    // poll and handle records
-    synchronized void update(Duration pollTimeout) {
-        try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
-            // swallow
-        }
-    }
-
-    public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+    public void close() {

Review Comment:
   Nit:
   ```suggestion
       @Override
       public void close() {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) {

Review Comment:
   Should this condition be inverted?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   This is fairly unclean but I can't think of a significantly better alternative... guess it's good enough for now.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+            TopicAdmin finalAdmin = admin = new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+            store = new KafkaBasedLog<byte[], byte[]>(
+                    config.offsetSyncsTopic(),
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    () -> finalAdmin,
+                    (error, record) -> this.handleRecord(record),
+                    Time.SYSTEM,
+                    ignored -> {
+                    }) {
+
+                @Override
+                protected Producer<byte[], byte[]> createProducer() {
+                    return null;
+                }
+
+                @Override
+                protected Consumer<byte[], byte[]> createConsumer() {
+                    return finalConsumer;
+                }
+
+                @Override
+                protected boolean readPartition(TopicPartition topicPartition) {
+                    return topicPartition.partition() == 0;
+                }
+            };
+        } catch (Throwable t) {
+            Utils.closeQuietly(consumer, "consumer for offset syncs");
+            Utils.closeQuietly(admin, "admin client for offset syncs");
+            throw t;
+        }
+        this.admin = admin;
+        this.backingStore = store;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
+    }
+
+    public void start() {

Review Comment:
   Can we add a Javadoc here? At a minimum, we should make it clear that this performs a synchronous read to the end of the topic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -392,6 +401,10 @@ protected Consumer<K, V> createConsumer() {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    protected boolean readPartition(TopicPartition topicPartition) {

Review Comment:
   Nit: add a Javadoc explaining when this method will be used, what it will be used for, and that it's designed to be overridden by subclasses?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##########
@@ -257,8 +257,17 @@ public void start() {
                     " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
                     " this is your first use of the topic it may have taken too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        for (PartitionInfo partition : partitionInfos) {
+            TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition());
+            if (readPartition(topicPartition)) {
+                partitions.add(topicPartition);
+            }
+        }
+        if (partitions.isEmpty()) {
+            throw new ConnectException("Some partitions for " + topic + " exist, but no partitions matched the " +
+                    "required filter. This could indicate a connectivity issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken too long to create.");

Review Comment:
   What circumstances do we realistically anticipate that would lead to this which involve connectivity issues, unavailable topic partitions, or a lag in topic metadata propagation after it has been created?
   
   IMO it's fine to keep just the first sentence here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1106151656


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    private volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            Consumer<byte[], byte[]> finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   I pulled this out to a private method which uses it's arguments as final variables. It looks a little bit less silly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13178:
URL: https://github.com/apache/kafka/pull/13178


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1411301405

   After #13181 is merged, I'll rebase and remove my fairness patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1106151042


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##########
@@ -134,9 +138,9 @@ public String version() {
     @Override
     public List<SourceRecord> poll() throws InterruptedException {
         try {
-            long deadline = System.currentTimeMillis() + interval.toMillis();
-            while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+            if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) {

Review Comment:
   The condition is correct, but i agree that changing this doesn't help very much. I was mostly doing this to avoid the "busy loop" warning, but i think that isn't really worth it, especially if we can have a follow-up that actually improves the waiting behavior and eliminates that warning. Reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107628693


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation
+    ) throws InterruptedException {
         try (Admin adminClient = connect.kafka().createAdminClient()) {
-            List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
+            Map<TopicPartition, OffsetSpec> tps = new HashMap<>(NUM_PARTITIONS * topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
                 for (String topic : topics) {
-                    tps.add(new TopicPartition(topic, partitionIndex));
+                    tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest());
                 }
             }
             long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
                     adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
+                long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+                        adminClient.listOffsets(tps).all().get();
+                long totalEndOffsets = endOffsets.values().stream()
+                        .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+                for (TopicPartition tp : endOffsets.keySet()) {
+                    if (consumerGroupOffsets.containsKey(tp)) {
+                        assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                                "Consumer group committed downstream offsets beyond the log end, this would lead to negative lag metrics"
+                        );
+                    }
+                }
                 boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
+                        ? totalEndOffsets == expectedTotalOffsets
+                        : totalEndOffsets >= expectedTotalOffsets;
+
+                boolean consumerGroupOffsetsMatch = exactOffsetTranslation
+                        ? totalConsumerGroupOffsets == expectedTotalOffsets
+                        : totalConsumerGroupOffsets >= expectedTotalOffsets;
                 // make sure the consumer group offsets are synced to expected number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                return totalOffsetsMatch && consumerGroupOffsetsMatch;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
 
+    protected static void insertDummyOffsetSyncs(EmbeddedConnectCluster cluster, String offsetSyncsTopic, String topic, int partitions) throws InterruptedException {
+        waitForTopicCreated(cluster, offsetSyncsTopic);
+        // Insert a large number of checkpoint records into the offset syncs topic to simulate
+        // a long-lived MM2 instance that has replicated many offsets in the past.
+        List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+        for (int partition = 0; partition < partitions; partition++) {
+            TopicPartition tp = new TopicPartition(topic, partition);
+            OffsetSync sync = new OffsetSync(tp, 0L, 0L);
+            records.add(new ProducerRecord<>(offsetSyncsTopic, sync.recordKey(), sync.recordValue()));
+        }
+        Map<String, Object> producerProps = new HashMap<>();
+        int sentRecords = 0;
+        try (KafkaProducer<byte[], byte[]> producer = cluster.kafka().createProducer(producerProps)) {
+            // Try to ensure that the contents of the offset syncs topic is too large to read before the checkpoint
+            // interval passes, so that the first checkpoint would take place before reading the whole contents of the
+            // sync topic. Experimentally, this test passes with <2x, and fails with >5x, without a fix for KAFKA-13659.
+            double consumeEfficiency = 10;
+            long deadline = System.currentTimeMillis() + (long) (consumeEfficiency * CHECKPOINT_INTERVAL_DURATION_MS);
+            int nRecords = records.size();
+            while (System.currentTimeMillis() < deadline) {
+                producer.send(records.get(sentRecords % nRecords));
+                sentRecords++;
+            }
+            producer.flush();
+        }
+        log.info("Sent {} dummy records to {}", sentRecords, offsetSyncsTopic);
+    }

Review Comment:
   I'll remove insertDummyOffsetSyncs, but leave the testRestartReplication, as it is still testing a new situation: restarting the connectors mid-replication.
   It won't target this specific bug without the `insertDummyOffsetSyncs`, but could still add value.
   
   I've added unit tests for OffsetSyncStore that address the bug (translating before start finishes) that should hopefully compensate for this test's smaller scope.
   
   I looked into testing MirrorCheckpointTask, and it looks too involved to address in this PR. I think it would be more appropriate to follow up later with some refactoring & testing for the checkpoint task.
   I feel confident from the results of the insertDummyOffsetSyncs test that:
   1. We are addressing the situation where the offset syncs topic is very large
   2. We are not introducing a severe performance regression, as the consumeEfficiency is very high.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1432402907

   Hmmm... there appear to be some integration test failures. I've reproduced some of them locally too, which makes flakiness an unlikely cause. Can you look into the integration test failures and see if we can get a green run before merging this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1105015802


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -76,26 +112,26 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
     }
 
     // poll and handle records
-    synchronized void update(Duration pollTimeout) {
+    synchronized void update(Duration pollTimeout) throws TimeoutException {
         try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
+            backingStore.readToEnd().get(pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
+        } catch (WakeupException | InterruptedException | ExecutionException e) {
             // swallow

Review Comment:
   This is much cleaner, as it puts the responsibility for balking on the OffsetSyncStore, rather than sharing the responsibility with the CheckpointTask. Thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100676199


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   Thanks for the explanation, that clears things up nicely.
   
   > Upon further inspection, maybe this test makes sense to turn into a parameter for other tests, to verify that the functionality of the checkpoints when the syncs topic is moved around. Does that make sense to do in this PR?
   
   I'm hesitant to drop this coverage from the test, because it seems at least as useful to verify that offset sync records are published in the expected location (and translated to consumer offsets) as it is to verify that they aren't published in the incorrect location. My personal preference would be to add the minimal additional logic necessary to this test to keep that coverage, and then pursue parameterization of offset sync location out of band. I'm also not even sure that parameterization would be worth the tradeoff in build/test time, although we might be able to find a healthy compromise there.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100690761


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   Hmm... if it's only a matter of reading to the end of the topic and then committing offsets, I think I'd prefer to have more coverage in this test. We do have assertions for regular checkpointing/syncing logic in other tests, but AFAICT we don't have anything to explicitly test the transition for a single consumer group from not being synced (even though other groups are being synced) to being synced.
   
   If we do have coverage for that somewhere else, then let me know and we can resolve this comment with no further action.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1423249156

   > Ouch, yeah that is certainly an issue that gets worse with my change.
   Before the checkpoints would be non-monotonic for transactional/compacted topics, and after it's non-monotonic for everyone.
   I think addressing this in a follow-up is a smart idea, this change is already messy enough.
   
   And funnily enough, someone's already [filed a ticket](https://issues.apache.org/jira/browse/KAFKA-13659) for that exact issue.
   
   Out of an abundance of caution, what do you think about targeting your `mm2-negative-offsets` branch with a new PR to address KAFKA-13659, which can be reviewed separately but then merged to `trunk` in tandem with this PR?
   
   This has the potential to be a fairly large change in behavior, and I'd like to do everything we can to minimize the chances that it breaks users' setups. Ensuring that this PR is merged if and only if a fix for KAFKA-13659 would help on that front.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100654263


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
-            return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   IMO the diagram is a bit overkill, but I tend not to be a very visual thinker. It certainly doesn't do any harm; I'm fine with leaving it in. Thanks for adding detail here, hopefully it'll come in handy if anyone needs to revisit this logic in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100667177


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   Ah, now I see (was confusing the total end offsets with the total consumer group offsets). Thanks for the explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1099340592


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster, topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.kafka().produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForAnyCheckpoint(

Review Comment:
   Nit: why "anyCheckpoint" instead of "allCheckpoints", given that we're ensuring that there's a checkpoint for every partition of the topic, instead of only one?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
-            return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   I believe this part is correct, but it took me a while to grok the motivation here. Could we pull this out into two lines, one to determine the amount we bump by (i.e., 0 or 1) with a comment providing a rationale for the logic (including why we can't do a more aggressive bump derived from the delta between the upstream offsets for the consumer group and the checkpoint), and one to return the downstream offset plus that bump?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         // have been automatically synchronized from primary to backup by the background job, so no
         // more records to consume from the replicated topic by the same consumer group at backup cluster
         assertEquals(0, records.count(), "consumer record size is not zero");
+        backupConsumer.close();

Review Comment:
   Good catch! Can we use a try-with-resources block for these consumers? Should set a precedent that makes it harder to leak them in the future.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -725,11 +730,20 @@ protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster
                 long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =

Review Comment:
   Nit: "offsets" and "totalOffsets" are a little generic and confusing; WDYT about "endOffsets" and "totalEndOffsets"?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   If I'm reading this correctly, the reason we don't expect `tp2` to have translated offsets here is because the upstream consumer group offset for this partition is behind the checkpoints emitted for it.
   
   If that's correct:
   1. Any idea why this was passing before?
   2. Do you think we should expand the test to commit an offset for this partition in the consumer group, then verify that that offset is included in the translated offsets afterward?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer,
+ * which interleaves transaction commit messages into the source topic which are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest {
+
+    private Map<String, Object> producerProps = new HashMap<>();
+
+    @BeforeEach
+    @Override
+    public void startClusters() throws Exception {
+        primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
+        backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+        primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+        backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0");
+        super.startClusters();
+    }
+
+    @Override
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {

Review Comment:
   I was a little skeptical about the purpose of the `produce` method in the base class; after seeing this, its value is clear.
   
   Mind adding a brief Javaodc to the base class's `producer` method explaining what it does and why it may be useful to override in a subclass?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   Why is this removed?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   Any idea how this test was passing before? It looks like with the wrong consumer group ID here, the old call to `waitForConsumerGroupOffsetSync` at line 481 should never have succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1424539164

   @mimaison This is a moderately large change in behavior and if possible, it'd be nice to get another set of eyes on it before merging. We don't need another reviewer for the PR changes (although comments are always welcome); instead, I'd just like confirmation that this change is safe to make as a bug fix.
   
   TL;DR: If an upstream consumer group is ahead of the upstream offset for the latest-emitted checkpoint, we will only sync offsets for that consumer group to the downstream cluster based on the offset pair for that checkpoint, instead of adding the delta of (upstream offset for consumer group - upstream offset in checkpoint), since there is no guarantee that that delta will be accurate in cases where the upstream topic is compacted, has transaction markers, or has some records filtered out via SMT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1426559007

   I think my commit to synchronize the OffsetSyncStore introduced a dramatic performance regression that is causing the current test failures. Where previously the test would comfortably pass with `consumeEfficiency = 10`, it now fails with any value over about `0.5`, for a ~20x decrease in read performance. I'll need to investigate this to confirm, and then look into ways to improve the performance of the OffsetSyncStore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1432451859

   Unfortunately those test failures only appear in the EOS test and appear to be caused by EOS mode.
   This is because MM2 doesn't do the periodic background commits that the offset syncs starvation fix relies on, and these assertions rely on the offset syncs starvation fix to assert that all the syncs are emitted.
   
   I added a tweak to the firePendingOffsetSyncs which drains the offset syncs map on each commit. Do you think we can use the same blocking-drain for EOS and normal mode, or should this behavior only be enabled for EOS mode?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gkousouris commented on pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

Posted by "gkousouris (via GitHub)" <gi...@apache.org>.
gkousouris commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1726599437

   Hi @gharris1727, if I understand this PR correctly, this will (almost always) cause duplication of data. Our problem is this:
   1. We have a service reading from a Kafka topic and committing offsets for its consumer group.
   2. We use MirrorMaker to replicate the topic to a different cluster.
   3. We pause the service and check the current offset for the 2 streams (the one in the old cluster and the one in the new cluster).
   
   In step 3, these offsets will be different, specifically, the offset from the old cluster will be the last message the service managed to commit an offset for. And the new topic will have as an offset the value: `last_checkpoint_before_upstream_offset + 1`. 
   
   Thus, when we restart the service (and make it consume from the new topic), it will re-process all messages from `last_checkpoint_before_upstream_offset + 1` until `downstream_offset`. Isn't this a problem considering that Mirror Maker is providing Exactly Once Semantics for committing messages ?
   
   This behaviour was verified by looking at the output of the `kafka-consumer-groups` script. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org