You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/17 15:34:10 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

mimaison commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r490344407



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -201,26 +213,24 @@ public void close() {
 
     @Test
     public void testReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-testReplication";
+        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
+                put("group.id", consumerGroupName);
+                put("auto.offset.reset", "latest");

Review comment:
       `latest` is the default, why are we setting it?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -136,10 +141,19 @@ public void setup() throws InterruptedException {
         backup.kafka().createTopic("primary.test-topic-1", 1);
         backup.kafka().createTopic("heartbeats", 1);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i);
-            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
-        }
+        // produce to all partitions of test-topic-1
+        produceMessages(primary, "test-topic-1", "message-1-");
+        produceMessages(backup, "test-topic-1", "message-2-");
+
+        // Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
+        Map<String, Object> dummyProps = new HashMap<String, Object>();

Review comment:
       We can use `Collections.singletonMap()` here

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -283,49 +296,140 @@ public void testReplication() throws InterruptedException {
 
         waitForCondition(() -> {
             try {
-                return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+                return primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                     Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
-        primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
-
-        assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
-        assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        assertTrue("Consumer failedback beyond expected downstream offset.", consumer2.position(
-            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
-        
-        consumer2.close();
-      
+        primaryConsumer = primary.kafka().createConsumer(consumerProps);
+        primaryConsumer.assign(allPartitions("test-topic-1", "backup.test-topic-1"));
+        seek(primaryConsumer, primaryOffsets);
+        consumeAllMessages(primaryConsumer, 0);
+
+        assertTrue("Consumer failedback to zero upstream offset.", primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback to zero downstream offset.", primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
+        assertTrue("Consumer failedback beyond expected upstream offset.", primaryConsumer.position(
+            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+        assertTrue("Consumer failedback beyond expected downstream offset.", primaryConsumer.position(
+            new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PER_PARTITION);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages2 = consumeAllMessages(primaryConsumer, 0);
+        // If offset translation was successful we expect no messages to be consumed after failback
+        assertEquals("Data was consumed from partitions: " + messages2.keySet() + ".", 0, messages2.size());
+        primaryConsumer.close();
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
 
-        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
-            primary.kafka().produce("test-topic-2", 0, "key", "message-2-" + i);
-            backup.kafka().produce("test-topic-3", 0, "key", "message-3-" + i);
+        produceMessages(primary, "test-topic-2", "message-3-", 1);
+        produceMessages(backup, "test-topic-3", "message-4-", 1);
+
+        assertEquals("Records were not produced to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count());
+        assertEquals("Records were not produced to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count());
+
+        assertEquals("New topic was not replicated to primary cluster.", NUM_RECORDS_PER_PARTITION,
+            primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count());
+        assertEquals("New topic was not replicated to backup cluster.", NUM_RECORDS_PER_PARTITION,
+            backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
+    }
+
+    @Test
+    public void testReplicationWithEmptyPartition() throws InterruptedException {

Review comment:
       This test is overly complicated. I think it could:
   - Create a topic
   - Produce messages to all partitions but one
   - Consume all messages
   - Start a single MirrorMaker2 instance primary->backup
   - Use `RemoteClusterUtils.translateOffsets()` to retrieve offsets
   - Assert offset for the last partition is 0
   
   For example, something along these lines (this cuts a few corners so you'd need to improve it)
   
   ```suggestion
       @Test
       public void testReplicationWithEmptyPartition() throws Exception {
           String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
           Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
               put("group.id", consumerGroupName);
               put("auto.offset.reset", "earliest");
           }};
   
           String topic = "test-topic-empty";
           primary.kafka().createTopic(topic, NUM_PARTITIONS);
           mm2Config = new MirrorMakerConfig(mm2Props);
           // produce to all test-topic-empty's partitions *but the last one*, on the primary cluster
           produceMessages(primary, topic, "message-1-", NUM_PARTITIONS - 1);
   
           // Consume, from the primary cluster, before starting the connectors so we don't need to wait for discovery
           Consumer<byte[], byte[]> consumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic);
           consumeAllMessages(consumer, NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1));
           consumer.close();
   
           waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup");
   
           Map<TopicPartition, OffsetAndMetadata> backupOffsets = RemoteClusterUtils.translateOffsets(
                   mm2Config.clientConfig("backup").adminConfig(),
                   "primary",
                   consumerGroupName,
                   Duration.ofMillis(CHECKPOINT_DURATION_MS));
   
           OffsetAndMetadata oam = backupOffsets.get(new TopicPartition("primary." + topic, NUM_PARTITIONS - 1));
           assertNotNull(oam);
           assertEquals(0, oam.offset());
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org