You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/08 16:55:07 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-12566: Fix MM2 causing negative downstream lag

C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1099340592


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -700,21 +673,53 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName,
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster, topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.kafka().produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForAnyCheckpoint(

Review Comment:
   Nit: why "anyCheckpoint" instead of "allCheckpoints", given that we're ensuring that there's a checkpoint for every partition of the topic, instead of only one?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -56,8 +56,7 @@ OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstr
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
-            return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
+            return OptionalLong.of(offsetSync.get().downstreamOffset() + (offsetSync.get().upstreamOffset() == upstreamOffset ? 0 : 1));

Review Comment:
   I believe this part is correct, but it took me a while to grok the motivation here. Could we pull this out into two lines, one to determine the amount we bump by (i.e., 0 or 1) with a comment providing a rationale for the logic (including why we can't do a more aggressive bump derived from the delta between the upstream offsets for the consumer group and the checkpoint), and one to return the downstream offset plus that bump?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -233,6 +229,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
         // have been automatically synchronized from primary to backup by the background job, so no
         // more records to consume from the replicated topic by the same consumer group at backup cluster
         assertEquals(0, records.count(), "consumer record size is not zero");
+        backupConsumer.close();

Review Comment:
   Good catch! Can we use a try-with-resources block for these consumers? Should set a precedent that makes it harder to leak them in the future.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -725,11 +730,20 @@ protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster
                 long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
                     .mapToLong(OffsetAndMetadata::offset).sum();
 
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =

Review Comment:
   Nit: "offsets" and "totalOffsets" are a little generic and confusing; WDYT about "endOffsets" and "totalEndOffsets"?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
             return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) &&
-                   translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));
+                   !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS));

Review Comment:
   If I'm reading this correctly, the reason we don't expect `tp2` to have translated offsets here is because the upstream consumer group offset for this partition is behind the checkpoints emitted for it.
   
   If that's correct:
   1. Any idea why this was passing before?
   2. Do you think we should expand the test to commit an offset for this partition in the consumer group, then verify that that offset is included in the translated offsets afterward?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer,
+ * which interleaves transaction commit messages into the source topic which are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest {
+
+    private Map<String, Object> producerProps = new HashMap<>();
+
+    @BeforeEach
+    @Override
+    public void startClusters() throws Exception {
+        primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
+        backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+        primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+        backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0");
+        super.startClusters();
+    }
+
+    @Override
+    protected void produce(EmbeddedConnectCluster cluster, String topic, Integer partition, String key, String value) {

Review Comment:
   I was a little skeptical about the purpose of the `produce` method in the base class; after seeing this, its value is clear.
   
   Mind adding a brief Javaodc to the base class's `producer` method explaining what it does and why it may be useful to override in a subclass?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -503,25 +495,6 @@ public void testOffsetSyncsTopicsOnTarget() throws Exception {
 
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Check offsets are pushed to the checkpoint topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
-        waitForCondition(() -> {
-            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
-                if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
-                    return true;
-                }
-            }
-            return false;
-        }, 30_000,
-            "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
-        );

Review Comment:
   Why is this removed?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -469,7 +461,7 @@ public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedExceptio
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {

Review Comment:
   Any idea how this test was passing before? It looks like with the wrong consumer group ID here, the old call to `waitForConsumerGroupOffsetSync` at line 481 should never have succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org