You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/28 16:19:28 UTC

[GitHub] [kafka] ryannedolan commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

ryannedolan commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r431952123



##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -316,6 +354,23 @@ public void testReplication() throws InterruptedException {
             backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
     }
 
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> consumeAllMessages(Consumer<byte[], byte[]> consumer) {

Review comment:
       Can we add a deadline to this function? Something like https://github.com/apache/kafka/blob/1c4eb1a5757df611735cfac9b709e0d80d0da4b3/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java#L164

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -272,28 +303,35 @@ public void testReplication() throws InterruptedException {
         waitForCondition(() -> {
             try {
                 return primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
-                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 0));
+                    Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("test-topic-1", 1));
             } catch (Throwable e) {
                 return false;
             }
         }, CHECKPOINT_DURATION_MS, "Offsets not translated upstream to primary cluster.");
 
         Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets("consumer-group-1", "backup",
                 Duration.ofMillis(CHECKPOINT_DURATION_MS));
- 
+
         // Failback consumer group to primary cluster
-        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer2.assign(primaryOffsets.keySet());
+        Consumer<byte[], byte[]> consumer2 = primary.kafka().createConsumer(consumerProps);
+        List<TopicPartition> primaryPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("backup.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer2.assign(primaryPartitions);
         primaryOffsets.forEach(consumer2::seek);
-        consumer2.poll(Duration.ofMillis(500));
 
         assertTrue("Consumer failedback to zero upstream offset.", consumer2.position(new TopicPartition("test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback to zero downstream offset.", consumer2.position(new TopicPartition("backup.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedback beyond expected upstream offset.", consumer2.position(
-            new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)) + Math.ceil((float) NUM_RECORDS_PRODUCED / NUM_PARTITIONS));

Review comment:
       I don't see why we need the cast, ceil, and coercion here? Integer math?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);
+        for (TopicPartition tp : backupPartitions) {
+            assertNotNull("No data consumed from partition " + tp + ".", messages1.get(tp));
+            int expectedMessageCount = tp.toString().equals("test-topic-1-0") ? 22 : 10;

Review comment:
       There is a lot of magic here -- why the special case and magic numbers? Can we restructure this to test the two cases separately maybe?

##########
File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##########
@@ -244,26 +251,50 @@ public void testReplication() throws InterruptedException {
 
         assertTrue("Offsets not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
             new TopicPartition("primary.test-topic-1", 0)));
+        assertTrue("Offset of empty partition not translated downstream to backup cluster. Found: " + backupOffsets, backupOffsets.containsKey(
+            new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
+
+        // Produce additional messages.
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            // produce to all partitions this time
+            primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+            backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i);
+        }
 
         // Failover consumer group to backup cluster.
-        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(Collections.singletonMap("group.id", "consumer-group-1"));
-        consumer1.assign(backupOffsets.keySet());
+        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
+                put("group.id", "consumer-group-1");
+                put("auto.offset.reset", "latest");
+            }};
+        Consumer<byte[], byte[]> consumer1 = backup.kafka().createConsumer(consumerProps);
+        List<TopicPartition> backupPartitions = IntStream.range(0, NUM_PARTITIONS)
+                .boxed()
+                .flatMap(p -> Stream.of(new TopicPartition("test-topic-1", p), new TopicPartition("primary.test-topic-1", p)))
+                .collect(Collectors.toList());
+        consumer1.assign(backupPartitions);
         backupOffsets.forEach(consumer1::seek);
-        consumer1.poll(Duration.ofMillis(500));
-        consumer1.commitSync();
 
         assertTrue("Consumer failedover to zero offset.", consumer1.position(new TopicPartition("primary.test-topic-1", 0)) > 0);
         assertTrue("Consumer failedover beyond expected offset.", consumer1.position(
-            new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED);
+            new TopicPartition("primary.test-topic-1", 0)) <= Math.ceil((float) NUM_RECORDS_PRODUCED / (NUM_PARTITIONS - 1)));
+        assertEquals("Consumer failedover to non-zero offset on last partition.", 0,
+            consumer1.position(new TopicPartition("primary.test-topic-1", NUM_PARTITIONS - 1)));
         assertTrue("Checkpoints were not emitted upstream to primary cluster.", primary.kafka().consume(1,
             CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0);
 
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> messages1 = consumeAllMessages(consumer1);
+        System.out.println(messages1);

Review comment:
       errant println




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org