You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/02/08 20:03:43 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1100550240


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster, topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.kafka().produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForAnyCheckpoint(

Review Comment:
   It's `any` in contrast to the other method's `FullSync`.
   As in, it accepts checkpoints anywhere within a topic, not just at the end of the topic.
   
   I'll update the method name to be more precise and verbose :)



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         // have been automatically synchronized from primary to backup by the background job, so no
         // more records to consume from the replicated topic by the same consumer group at backup cluster
         assertEquals(0, records.count(), "consumer record size is not zero");
+        backupConsumer.close();

Review Comment:
   I code-golfed this one, but there's no reason to. Updated.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   1. This was passing before, because `offset.max.lag` was nonzero, leaving a (0,0) offset sync in the topic.
   2. This condition is part of `waitForAnyCheckpoint/waitForCheckpointOnAllPartitions` which is used in `testReplication`. I think that test should be sufficient to cover the normal case where offsets are translated.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   This assertion relied on a nonzero `offset.lag.max`, where the sync topic could be left with a (0,0) record.
   The offset being translated in this case is the `consumer-group-dummy` from the setup method, which has offset 0 (at the beginning of the topic). Now that `offset.lag.max` is zero and the starvation bug is fixed, the sync topic may only see nonzero syncs, which are not able to translate 0 offsets.
   
   I removed this assertion because it was failing and didn't seem to concern this test.
   Upon further inspection, maybe this test makes sense to turn into a parameter for other tests, to verify that the functionality of the checkpoints when the syncs topic is moved around. Does that make sense to do in this PR?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   This is because the `waitForConsumerGroupOffsetSync` condition was much weaker, and only required that the total consumer group offsets across all topic-partitions was nonzero.
   The wait condition passed because the `test-topic-1` had committed offsets, and made the total consumer group offsets nonzero, even if all of the `test-topic-2` group offsets were 0.
   So it was sufficient for the first topic to have _any_ committed offsets, and for both topics to be fully replicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org